1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10 PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolDefinition, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33 pub text: String,
34 pub kind: String,
36}
37
38pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43 key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47 fn store(
48 &self,
49 key: crate::AwaitEventKey,
50 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51 let mut guard = self.key.lock().map_err(|_| {
52 crate::RuntimeError::new(
53 "tool_completion_state_poisoned",
54 "tool completion key state lock poisoned",
55 )
56 })?;
57 if let Some(existing) = guard.as_ref() {
58 return Ok(existing.clone());
59 }
60 *guard = Some(key.clone());
61 Ok(key)
62 }
63
64 pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65 self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66 crate::RuntimeError::new(
67 "tool_completion_state_poisoned",
68 "tool completion key state lock poisoned",
69 )
70 })
71 }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76 step_ids: Arc<Mutex<HashSet<String>>>,
77 process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81 fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82 let mut guard = self.step_ids.lock().map_err(|_| {
83 crate::RuntimeError::new(
84 "durable_effect_state_poisoned",
85 "durable effect step state lock poisoned",
86 )
87 })?;
88 if !guard.insert(step_id.to_string()) {
89 return Err(crate::RuntimeError::new(
90 "durable_effect_duplicate_step_id",
91 format!("durable effect step id `{step_id}` was already used by this tool call"),
92 ));
93 }
94 Ok(())
95 }
96
97 fn next_process_event_sequence(&self) -> u64 {
98 self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99 }
100}
101
102#[derive(Clone)]
105pub struct ToolContext<'run> {
106 pub(crate) session_id: String,
107 pub(crate) agent_frame_id: crate::AgentFrameId,
108 pub(crate) sessions: Arc<dyn SessionStateService>,
109 pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110 pub(crate) processes: Arc<dyn crate::ProcessService>,
111 pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113 pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114 pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116 pub(crate) async_process_id: Option<String>,
117 pub(crate) runtime_process_id: Option<String>,
118 pub(crate) process_events: Option<ToolProcessEventContext>,
119 pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120 pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121 pub(crate) prepared_payload: serde_json::Value,
122 pub(crate) tool_execution_binding: serde_json::Value,
123 pub(crate) tool_call_id: Option<String>,
125 pub(crate) attempt_number: u32,
126 pub(crate) max_attempts: u32,
127 pub(crate) replay_key: Option<String>,
128 pub(crate) completion: ToolCompletionState,
129 pub(crate) durable_effects: ToolDurableEffectState,
130 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
131 pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
132 pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
133}
134
135#[derive(Clone)]
136pub struct ToolChildProcessStarted {
137 pub process_id: String,
138 pub child_entry_name: Option<String>,
139}
140
141#[derive(Clone)]
142pub struct ToolChildExecutionTraceHook {
143 on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
144}
145
146impl ToolChildExecutionTraceHook {
147 pub fn new(
148 on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
149 ) -> Self {
150 Self {
151 on_child_process_started: Arc::new(on_child_process_started),
152 }
153 }
154
155 pub fn child_process_started(&self, event: ToolChildProcessStarted) {
156 (self.on_child_process_started)(event);
157 }
158}
159
160#[derive(Clone)]
161pub(crate) struct ToolProcessEventContext {
162 process_id: String,
163 registry: Arc<dyn crate::ProcessRegistry>,
164 store: Option<Arc<dyn crate::RuntimePersistence>>,
165 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
166 session_graph: Arc<dyn SessionGraphService>,
167 queued_work_driver: Option<crate::QueuedWorkDriver>,
168}
169
170pub(crate) struct ToolContextBuilder<'run> {
171 session_id: String,
172 agent_frame_id: crate::AgentFrameId,
173 sessions: Arc<dyn SessionStateService>,
174 session_lifecycle: Arc<dyn SessionLifecycleService>,
175 session_graph: Arc<dyn SessionGraphService>,
176 processes: Arc<dyn crate::ProcessService>,
177 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
178 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
179 runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
180 runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
181 cancellation_token: Option<tokio_util::sync::CancellationToken>,
182 async_process_id: Option<String>,
183 runtime_process_id: Option<String>,
184 process_events: Option<ToolProcessEventContext>,
185 attachment_store: Arc<dyn AttachmentStore>,
186 direct_completions: crate::DirectCompletionClient<'run>,
187 prepared_payload: serde_json::Value,
188 tool_execution_binding: serde_json::Value,
189 tool_call_id: Option<String>,
190 completion: ToolCompletionState,
191 durable_effects: ToolDurableEffectState,
192 parent_invocation: Option<crate::RuntimeInvocation>,
193 execution_env_spec: crate::ProcessExecutionEnvSpec,
194 child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
195}
196
197impl<'run> ToolContextBuilder<'run> {
198 pub(crate) fn from_dispatch(
199 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
200 ) -> Self {
201 Self {
202 session_id: dispatch.session_id.clone(),
203 agent_frame_id: dispatch.agent_frame_id.clone(),
204 sessions: Arc::clone(&dispatch.sessions),
205 session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
206 session_graph: Arc::clone(&dispatch.session_graph),
207 processes: Arc::clone(&dispatch.processes),
208 process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
209 effect_controller: dispatch.effect_controller.clone(),
210 runtime_dispatch: Some(Arc::clone(&dispatch)),
211 runtime_execution_context: None,
212 cancellation_token: None,
213 async_process_id: None,
214 runtime_process_id: None,
215 process_events: None,
216 attachment_store: Arc::clone(&dispatch.attachment_store),
217 direct_completions: dispatch.direct_completions.clone(),
218 prepared_payload: serde_json::Value::Null,
219 tool_execution_binding: serde_json::Value::Null,
220 tool_call_id: None,
221 completion: ToolCompletionState::default(),
222 durable_effects: ToolDurableEffectState::default(),
223 parent_invocation: dispatch.parent_invocation.clone(),
224 execution_env_spec: dispatch.execution_env_spec.clone(),
225 child_execution_trace_hook: None,
226 }
227 }
228
229 #[cfg(any(test, feature = "testing"))]
230 pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
231 self.tool_call_id = tool_call_id.into();
232 self
233 }
234
235 pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
236 self.tool_call_id = Some(call.call_id.clone());
237 self.prepared_payload = call.prepared_payload.clone();
238 self
239 }
240
241 pub(crate) fn tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
242 self.tool_execution_binding = binding;
243 self
244 }
245
246 pub(crate) fn cancellation_token(
247 mut self,
248 cancellation_token: Option<tokio_util::sync::CancellationToken>,
249 ) -> Self {
250 self.cancellation_token = cancellation_token;
251 self
252 }
253
254 pub(crate) fn runtime_execution_context(
255 mut self,
256 context: crate::RuntimeExecutionContext<'run>,
257 ) -> Self {
258 self.runtime_execution_context = Some(context);
259 self
260 }
261
262 pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
263 self.runtime_process_id = process_id;
264 self
265 }
266
267 pub(crate) fn async_process(
268 mut self,
269 process_id: impl Into<String>,
270 cancellation_token: tokio_util::sync::CancellationToken,
271 ) -> Self {
272 self.async_process_id = Some(process_id.into());
273 self.cancellation_token = Some(cancellation_token);
274 self
275 }
276
277 pub(crate) fn process_events(
278 mut self,
279 process_id: impl Into<String>,
280 registry: Arc<dyn crate::ProcessRegistry>,
281 store: Option<Arc<dyn crate::RuntimePersistence>>,
282 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
283 queued_work_driver: Option<crate::QueuedWorkDriver>,
284 ) -> Self {
285 self.process_events = Some(ToolProcessEventContext {
286 process_id: process_id.into(),
287 registry,
288 store,
289 session_store_factory,
290 session_graph: Arc::clone(&self.session_graph),
291 queued_work_driver,
292 });
293 self
294 }
295
296 pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
297 self.parent_invocation = metadata;
298 self
299 }
300
301 pub(crate) fn child_execution_trace_hook(
302 mut self,
303 hook: Option<ToolChildExecutionTraceHook>,
304 ) -> Self {
305 self.child_execution_trace_hook = hook;
306 self
307 }
308
309 pub(crate) fn build(self) -> ToolContext<'run> {
310 ToolContext {
311 session_id: self.session_id,
312 agent_frame_id: self.agent_frame_id,
313 sessions: self.sessions,
314 session_lifecycle: self.session_lifecycle,
315 processes: self.processes,
316 process_cancel_ability: self.process_cancel_ability,
317 effect_controller: self.effect_controller,
318 runtime_dispatch: self.runtime_dispatch,
319 runtime_execution_context: self.runtime_execution_context,
320 cancellation_token: self.cancellation_token,
321 async_process_id: self.async_process_id,
322 runtime_process_id: self.runtime_process_id,
323 process_events: self.process_events,
324 attachment_store: self.attachment_store,
325 direct_completions: self.direct_completions,
326 prepared_payload: self.prepared_payload,
327 tool_execution_binding: self.tool_execution_binding,
328 tool_call_id: self.tool_call_id,
329 attempt_number: 1,
330 max_attempts: 1,
331 replay_key: None,
332 completion: self.completion,
333 durable_effects: self.durable_effects,
334 parent_invocation: self.parent_invocation,
335 execution_env_spec: self.execution_env_spec,
336 child_execution_trace_hook: self.child_execution_trace_hook,
337 }
338 }
339}
340
341impl<'run> ToolContext<'run> {
342 #[cfg(any(test, feature = "testing"))]
343 #[expect(
344 clippy::too_many_arguments,
345 reason = "testing constructor mirrors the sealed runtime tool context dependencies"
346 )]
347 pub(crate) fn builder(
348 session_id: String,
349 sessions: Arc<dyn SessionStateService>,
350 session_lifecycle: Arc<dyn SessionLifecycleService>,
351 session_graph: Arc<dyn SessionGraphService>,
352 processes: Arc<dyn crate::ProcessService>,
353 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
354 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
355 attachment_store: Arc<dyn AttachmentStore>,
356 direct_completions: crate::DirectCompletionClient<'run>,
357 ) -> ToolContextBuilder<'run> {
358 ToolContextBuilder {
359 session_id,
360 agent_frame_id: String::new(),
361 sessions,
362 session_lifecycle,
363 session_graph,
364 processes,
365 process_cancel_ability,
366 effect_controller,
367 runtime_dispatch: None,
368 runtime_execution_context: None,
369 cancellation_token: None,
370 async_process_id: None,
371 runtime_process_id: None,
372 process_events: None,
373 attachment_store,
374 direct_completions,
375 prepared_payload: serde_json::Value::Null,
376 tool_execution_binding: serde_json::Value::Null,
377 tool_call_id: None,
378 completion: ToolCompletionState::default(),
379 durable_effects: ToolDurableEffectState::default(),
380 parent_invocation: None,
381 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
382 crate::PluginOptions::default(),
383 crate::SessionPolicy::default(),
384 ),
385 child_execution_trace_hook: None,
386 }
387 }
388
389 pub(crate) fn from_dispatch(
390 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
391 ) -> ToolContextBuilder<'run> {
392 ToolContextBuilder::from_dispatch(dispatch)
393 }
394
395 pub fn session_id(&self) -> &str {
396 &self.session_id
397 }
398
399 pub fn agent_frame_id(&self) -> &str {
400 &self.agent_frame_id
401 }
402
403 pub fn sessions(&self) -> ToolSessionAdmin<'run> {
404 ToolSessionAdmin {
405 session_id: self.session_id.clone(),
406 sessions: Arc::clone(&self.sessions),
407 session_lifecycle: Arc::clone(&self.session_lifecycle),
408 effect_controller: self.effect_controller.clone(),
409 }
410 }
411
412 pub fn dispatch(&self) -> ToolDispatchClient<'run> {
413 ToolDispatchClient {
414 context: self.clone(),
415 }
416 }
417
418 pub fn triggers(&self) -> ToolTriggerClient<'run> {
419 ToolTriggerClient {
420 context: self.clone(),
421 }
422 }
423
424 pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
425 ToolSessionProcessAdmin {
426 session_id: self.session_id.clone(),
427 agent_frame_id: self.agent_frame_id.clone(),
428 processes: Arc::clone(&self.processes),
429 process_cancel_ability: Arc::clone(&self.process_cancel_ability),
430 effect_controller: self.effect_controller.clone(),
431 parent_invocation: self.parent_invocation.clone(),
432 tool_call_id: self.tool_call_id.clone(),
433 execution_env_spec: self.execution_env_spec.clone(),
434 }
435 }
436
437 pub fn emit_child_process_started(
438 &self,
439 process_id: impl Into<String>,
440 child_entry_name: Option<String>,
441 ) {
442 let Some(hook) = &self.child_execution_trace_hook else {
443 return;
444 };
445 hook.child_process_started(ToolChildProcessStarted {
446 process_id: process_id.into(),
447 child_entry_name,
448 });
449 }
450
451 pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
452 ToolDirectCompletionClient {
453 session_id: self.session_id.clone(),
454 tool_call_id: self.tool_call_id.clone(),
455 direct_completions: self.direct_completions.clone(),
456 parent_invocation: self.parent_invocation.clone(),
457 parent_tool_attempt_is_durable: self.parent_invocation.as_ref().is_some_and(
458 |invocation| {
459 invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
460 },
461 ) && self
462 .effect_controller
463 .controller()
464 .durability_tier()
465 == crate::DurabilityTier::Durable,
466 }
467 }
468
469 pub fn attachments(&self) -> ToolAttachmentClient {
470 ToolAttachmentClient {
471 store: Arc::clone(&self.attachment_store),
472 }
473 }
474
475 pub fn process_events(&self) -> ToolProcessEventClient {
476 ToolProcessEventClient {
477 context: self.process_events.clone(),
478 }
479 }
480
481 pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
488 let Some(tool_call_id) = self.tool_call_id.as_deref() else {
489 return Err(crate::RuntimeError::new(
490 "durable_effects_missing_call_id",
491 "durable effects require a prepared tool call id",
492 ));
493 };
494 if tool_call_id.trim().is_empty() {
495 return Err(crate::RuntimeError::new(
496 "durable_effects_missing_call_id",
497 "durable effects require a non-empty prepared tool call id",
498 ));
499 }
500 let scoped = self.effect_controller.scoped();
501 if !scoped.controller().supports_durable_effects() {
502 return Err(crate::RuntimeError::new(
503 "durable_effects_unavailable",
504 "this effect controller does not support durable tool effects",
505 ));
506 }
507 if self.parent_invocation.as_ref().is_some_and(|invocation| {
508 invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
509 }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
510 {
511 return Err(crate::RuntimeError::new(
512 "durable_effects_unavailable_in_tool_attempt",
513 "durable tool sub-effects are not available inside a journaled tool attempt",
514 ));
515 }
516 Ok(ToolDurableEffects { context: self })
517 }
518
519 pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
520 self.cancellation_token.as_ref()
521 }
522
523 pub fn async_process_id(&self) -> Option<&str> {
524 self.async_process_id.as_deref()
525 }
526
527 pub fn runtime_process_id(&self) -> Option<&str> {
528 self.async_process_id
529 .as_deref()
530 .or(self.runtime_process_id.as_deref())
531 .or_else(|| {
532 self.process_events
533 .as_ref()
534 .map(|context| context.process_id.as_str())
535 })
536 }
537
538 pub fn tool_call_id(&self) -> Option<&str> {
539 self.tool_call_id.as_deref()
540 }
541
542 pub fn prepared_payload(&self) -> &serde_json::Value {
543 &self.prepared_payload
544 }
545
546 pub fn tool_execution_binding(&self) -> &serde_json::Value {
547 &self.tool_execution_binding
548 }
549
550 pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
551 where
552 T: serde::de::DeserializeOwned,
553 {
554 serde_json::from_value(self.prepared_payload.clone())
555 }
556
557 pub fn attempt_number(&self) -> u32 {
558 self.attempt_number
559 }
560
561 pub fn max_attempts(&self) -> u32 {
562 self.max_attempts
563 }
564
565 pub fn replay_key(&self) -> Option<&str> {
566 self.replay_key.as_deref()
567 }
568
569 pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
583 let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
584 crate::RuntimeError::new(
585 "tool_completion_key_missing_call_id",
586 "completion keys require a prepared tool call id",
587 )
588 })?;
589 let scoped = self.effect_controller.scoped();
590 let key = scoped
591 .controller()
592 .await_event_key(
593 scoped.execution_scope(),
594 crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
595 )
596 .await?;
597 self.completion.store(key)
598 }
599
600 pub(crate) fn take_completion_key(
601 &self,
602 ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
603 self.completion.take()
604 }
605
606 pub fn with_async_process(
607 mut self,
608 process_id: impl Into<String>,
609 cancellation_token: tokio_util::sync::CancellationToken,
610 ) -> Self {
611 self.async_process_id = Some(process_id.into());
612 self.runtime_process_id = self.async_process_id.clone();
613 self.cancellation_token = Some(cancellation_token);
614 self
615 }
616
617 #[cfg(any(test, feature = "testing"))]
618 #[doc(hidden)]
619 pub fn with_process_events_for_testing(
620 mut self,
621 process_id: impl Into<String>,
622 registry: Arc<dyn crate::ProcessRegistry>,
623 ) -> Self {
624 self.process_events = Some(ToolProcessEventContext {
625 process_id: process_id.into(),
626 registry,
627 store: None,
628 session_store_factory: None,
629 session_graph: Arc::new(crate::plugin::NoopSessionManager),
630 queued_work_driver: None,
631 });
632 self
633 }
634
635 pub(crate) fn with_retry_context(
636 mut self,
637 tool_name: &str,
638 attempt_number: u32,
639 max_attempts: u32,
640 ) -> Self {
641 self.attempt_number = attempt_number.max(1);
642 self.max_attempts = max_attempts.max(1);
643 self.replay_key = self
644 .tool_call_id
645 .as_ref()
646 .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
647 self
648 }
649
650 pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
651 self.prepared_payload = payload;
652 self
653 }
654
655 pub(crate) fn with_tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
656 self.tool_execution_binding = binding;
657 self
658 }
659
660 #[cfg(any(test, feature = "testing"))]
663 #[doc(hidden)]
664 #[expect(
665 clippy::too_many_arguments,
666 reason = "test-only constructor mirrors the sealed runtime tool context"
667 )]
668 pub fn __for_testing(
669 session_id: String,
670 sessions: Arc<dyn SessionStateService>,
671 session_lifecycle: Arc<dyn SessionLifecycleService>,
672 session_graph: Arc<dyn SessionGraphService>,
673 processes: Arc<dyn crate::ProcessService>,
674 attachment_store: Arc<dyn AttachmentStore>,
675 direct_completions: crate::DirectCompletionClient<'static>,
676 tool_call_id: Option<String>,
677 ) -> ToolContext<'static> {
678 ToolContext::builder(
679 session_id,
680 sessions,
681 session_lifecycle,
682 session_graph,
683 processes,
684 Arc::new(crate::DefaultProcessCancelAbility),
685 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
686 crate::InlineRuntimeEffectController,
687 )),
688 attachment_store,
689 direct_completions,
690 )
691 .tool_call_id(tool_call_id)
692 .build()
693 }
694
695 #[cfg(any(test, feature = "testing"))]
699 #[doc(hidden)]
700 #[expect(
701 clippy::too_many_arguments,
702 reason = "test-only constructor mirrors the sealed runtime context"
703 )]
704 pub fn __for_testing_with_process_cancel_ability(
705 session_id: String,
706 sessions: Arc<dyn SessionStateService>,
707 session_lifecycle: Arc<dyn SessionLifecycleService>,
708 session_graph: Arc<dyn SessionGraphService>,
709 processes: Arc<dyn crate::ProcessService>,
710 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
711 attachment_store: Arc<dyn AttachmentStore>,
712 direct_completions: crate::DirectCompletionClient<'static>,
713 tool_call_id: Option<String>,
714 ) -> ToolContext<'static> {
715 ToolContext::builder(
716 session_id,
717 sessions,
718 session_lifecycle,
719 session_graph,
720 processes,
721 process_cancel_ability,
722 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
723 crate::InlineRuntimeEffectController,
724 )),
725 attachment_store,
726 direct_completions,
727 )
728 .tool_call_id(tool_call_id)
729 .build()
730 }
731}
732
733pub struct ToolDurableEffects<'ctx, 'run> {
739 context: &'ctx ToolContext<'run>,
740}
741
742impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
743 pub async fn run_json<F, Fut>(
744 &self,
745 step_id: impl Into<String>,
746 input: serde_json::Value,
747 run: F,
748 ) -> Result<serde_json::Value, crate::RuntimeError>
749 where
750 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
751 Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
752 {
753 let step_id = step_id.into();
754 if step_id.trim().is_empty() {
755 return Err(crate::RuntimeError::new(
756 "durable_effect_empty_step_id",
757 "durable effect step id must be non-empty",
758 ));
759 }
760 self.context.durable_effects.reserve_step(&step_id)?;
761 let invocation = self.step_invocation(
762 format!("durable-step:{step_id}"),
763 crate::RuntimeEffectKind::DurableStep,
764 format!("durable-step:{step_id}"),
765 )?;
766 let outcome = self
767 .context
768 .effect_controller
769 .controller()
770 .execute_effect(
771 crate::RuntimeEffectEnvelope::new(
772 invocation,
773 crate::RuntimeEffectCommand::DurableStep {
774 step_id,
775 input: input.clone(),
776 },
777 ),
778 crate::RuntimeEffectLocalExecutor::durable_step(run),
779 )
780 .await
781 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
782 outcome
783 .into_durable_step()
784 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
785 }
786
787 pub async fn external_event_key(
788 &self,
789 key: impl Into<String>,
790 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
791 let key = key.into();
792 if key.trim().is_empty() {
793 return Err(crate::RuntimeError::new(
794 "durable_effect_empty_event_key",
795 "durable effect external event key must be non-empty",
796 ));
797 }
798 let scoped = self.context.effect_controller.scoped();
799 scoped
800 .controller()
801 .await_event_key(
802 scoped.execution_scope(),
803 crate::AwaitEventWaitIdentity::Custom { key },
804 )
805 .await
806 }
807
808 pub async fn await_event_json(
809 &self,
810 key: crate::AwaitEventKey,
811 ) -> Result<serde_json::Value, crate::RuntimeError> {
812 let invocation = self.step_invocation(
813 format!("await-event:{}", key.key_id),
814 crate::RuntimeEffectKind::AwaitEvent,
815 format!("await-event:{}", key.key_id),
816 )?;
817 let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
818 let clock = self
819 .context
820 .runtime_dispatch
821 .as_ref()
822 .map(|dispatch| Arc::clone(&dispatch.clock))
823 .unwrap_or_else(|| Arc::new(crate::SystemClock));
824 let outcome = self
825 .context
826 .effect_controller
827 .controller()
828 .execute_effect(
829 crate::RuntimeEffectEnvelope::new(
830 invocation,
831 crate::RuntimeEffectCommand::AwaitEvent { key },
832 ),
833 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
834 cancellation,
835 None,
836 clock,
837 ),
838 )
839 .await
840 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
841 match outcome
842 .into_await_event()
843 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
844 {
845 crate::Resolution::Ok(value) => Ok(value),
846 crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
847 crate::Resolution::Timeout => Err(crate::RuntimeError::new(
848 "durable_effect_event_timeout",
849 "durable effect external event wait timed out",
850 )),
851 crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
852 "durable_effect_event_cancelled",
853 "durable effect external event wait was cancelled",
854 )),
855 }
856 }
857
858 pub async fn emit_process_event(
859 &self,
860 event_type: impl Into<String>,
861 payload: serde_json::Value,
862 ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
863 let Some(process) = self.context.process_events.as_ref() else {
864 return Err(crate::RuntimeError::new(
865 "durable_effect_process_event_unavailable",
866 "durable effect process events are unavailable outside a durable process",
867 ));
868 };
869 let event_type = event_type.into();
870 if event_type.trim().is_empty() {
871 return Err(crate::RuntimeError::new(
872 "durable_effect_empty_process_event_type",
873 "durable effect process event type must be non-empty",
874 ));
875 }
876 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
877 crate::RuntimeError::new(
878 "durable_effects_missing_call_id",
879 "durable effects require a prepared tool call id",
880 )
881 })?;
882 let sequence = self.context.durable_effects.next_process_event_sequence();
883 let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
884 format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
885 );
886 self.context
887 .process_events()
888 .emit_request(request)
889 .await
890 .map_err(|err| {
891 crate::RuntimeError::new(
892 "durable_effect_process_event_append_failed",
893 err.to_string(),
894 )
895 })
896 .and_then(|event| {
897 if event.process_id == process.process_id {
898 Ok(event)
899 } else {
900 Err(crate::RuntimeError::new(
901 "durable_effect_process_event_process_mismatch",
902 "process event append returned an event for a different process",
903 ))
904 }
905 })
906 }
907
908 fn step_invocation(
909 &self,
910 effect_id_suffix: impl Into<String>,
911 kind: crate::RuntimeEffectKind,
912 replay_suffix: impl AsRef<str>,
913 ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
914 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
915 crate::RuntimeError::new(
916 "durable_effects_missing_call_id",
917 "durable effects require a prepared tool call id",
918 )
919 })?;
920 let effect_id_suffix = effect_id_suffix.into();
921 if let Some(parent) = self.context.parent_invocation.as_ref() {
922 return Ok(crate::runtime::causal::child_effect_invocation(
923 parent,
924 format!("{tool_call_id}:{effect_id_suffix}"),
925 kind,
926 replay_suffix,
927 ));
928 }
929 let scoped = self.context.effect_controller.scoped();
930 let replay_key = format!(
931 "{}:tool:{tool_call_id}:{}",
932 scoped.scope_id(),
933 replay_suffix.as_ref()
934 );
935 Ok(crate::RuntimeInvocation::effect(
936 crate::RuntimeScope::new(self.context.session_id.clone()),
937 format!("{tool_call_id}:{effect_id_suffix}"),
938 kind,
939 replay_key,
940 ))
941 }
942}
943
944#[derive(Clone, Debug, Serialize, Deserialize)]
950pub struct PreparedToolCall {
951 pub call_id: String,
952 pub tool_id: ToolId,
953 pub tool_name: String,
954 pub args: serde_json::Value,
955 #[serde(default, skip_serializing_if = "Option::is_none")]
956 pub replay: Option<ProviderReplayMeta>,
957 #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
958 pub prepared_payload: serde_json::Value,
959}
960
961impl PreparedToolCall {
962 pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
963 Self {
964 call_id: call.call_id,
965 tool_id,
966 tool_name: call.tool_name,
967 args: call.args,
968 replay: call.replay,
969 prepared_payload: serde_json::Value::Null,
970 }
971 }
972
973 pub fn from_parts(
974 call_id: impl Into<String>,
975 tool_id: impl Into<ToolId>,
976 tool_name: impl Into<String>,
977 args: serde_json::Value,
978 replay: Option<ProviderReplayMeta>,
979 prepared_payload: serde_json::Value,
980 ) -> Self {
981 Self {
982 call_id: call_id.into(),
983 tool_id: tool_id.into(),
984 tool_name: tool_name.into(),
985 args,
986 replay,
987 prepared_payload,
988 }
989 }
990}
991
992#[derive(Clone, Debug, Serialize, Deserialize)]
998pub struct PreparedToolBatchCall {
999 pub call: PreparedToolCall,
1000 pub replay_suffix: String,
1001 #[serde(default, skip_serializing_if = "Option::is_none")]
1002 pub execution_grant: Option<Box<ToolExecutionGrant>>,
1003}
1004
1005#[derive(Clone, Debug, Serialize, Deserialize)]
1011pub struct PreparedToolBatch {
1012 pub batch_id: String,
1013 pub calls: Vec<PreparedToolBatchCall>,
1014}
1015
1016impl PreparedToolBatch {
1017 pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
1018 let batch_id = batch_id.into();
1019 let calls = calls
1020 .into_iter()
1021 .enumerate()
1022 .map(|(index, call)| PreparedToolBatchCall {
1023 replay_suffix: format!("child:{index}:{}", call.call_id),
1024 call,
1025 execution_grant: None,
1026 })
1027 .collect();
1028 Self { batch_id, calls }
1029 }
1030
1031 pub fn new_with_grants(
1032 batch_id: impl Into<String>,
1033 calls: Vec<(PreparedToolCall, Option<ToolExecutionGrant>)>,
1034 ) -> Self {
1035 let batch_id = batch_id.into();
1036 let calls = calls
1037 .into_iter()
1038 .enumerate()
1039 .map(|(index, (call, execution_grant))| PreparedToolBatchCall {
1040 replay_suffix: format!("child:{index}:{}", call.call_id),
1041 call,
1042 execution_grant: execution_grant.map(Box::new),
1043 })
1044 .collect();
1045 Self { batch_id, calls }
1046 }
1047
1048 pub fn is_empty(&self) -> bool {
1049 self.calls.is_empty()
1050 }
1051
1052 pub fn len(&self) -> usize {
1053 self.calls.len()
1054 }
1055}
1056
1057#[derive(Clone, Debug, Serialize, Deserialize)]
1065pub struct ToolExecutionGrant {
1066 pub manifest: ToolManifest,
1068 pub contract: Box<ToolContract>,
1071 pub source_id: Option<String>,
1074 pub execution_binding: serde_json::Value,
1076}
1077
1078impl ToolExecutionGrant {
1079 pub fn new(manifest: ToolManifest, contract: ToolContract) -> Self {
1080 Self {
1081 manifest,
1082 contract: Box::new(contract),
1083 source_id: None,
1084 execution_binding: serde_json::Value::Null,
1085 }
1086 }
1087
1088 pub fn from_definition(definition: ToolDefinition) -> Self {
1089 Self::new(definition.manifest(), definition.contract())
1090 }
1091
1092 pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
1093 self.source_id = Some(source_id.into());
1094 self
1095 }
1096
1097 pub fn with_execution_binding(mut self, execution_binding: serde_json::Value) -> Self {
1098 self.execution_binding = execution_binding;
1099 self
1100 }
1101}
1102
1103#[derive(Clone)]
1104pub struct ToolPrepareContext {
1105 session_id: String,
1106 sessions: Arc<dyn SessionStateService>,
1107 turn_context: crate::TurnContext,
1108 tool_call_id: Option<String>,
1109 tool_execution_binding: serde_json::Value,
1110}
1111
1112impl ToolPrepareContext {
1113 pub(crate) fn with_execution_binding(
1114 session_id: String,
1115 sessions: Arc<dyn SessionStateService>,
1116 turn_context: crate::TurnContext,
1117 tool_call_id: Option<String>,
1118 tool_execution_binding: serde_json::Value,
1119 ) -> Self {
1120 Self {
1121 session_id,
1122 sessions,
1123 turn_context,
1124 tool_call_id,
1125 tool_execution_binding,
1126 }
1127 }
1128
1129 pub fn session_id(&self) -> &str {
1130 &self.session_id
1131 }
1132
1133 pub fn tool_call_id(&self) -> Option<&str> {
1134 self.tool_call_id.as_deref()
1135 }
1136
1137 pub fn tool_execution_binding(&self) -> &serde_json::Value {
1138 &self.tool_execution_binding
1139 }
1140
1141 pub fn turn_context(&self) -> &crate::TurnContext {
1142 &self.turn_context
1143 }
1144
1145 pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1146 where
1147 T: 'static,
1148 {
1149 self.turn_context.plugin_input::<T>(plugin_id)
1150 }
1151
1152 pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1153 self.sessions.snapshot_session(&self.session_id).await
1154 }
1155
1156 pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1157 self.sessions.tool_catalog(&self.session_id).await
1158 }
1159
1160 pub async fn shared_tool_catalog(
1161 &self,
1162 ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1163 self.sessions.shared_tool_catalog(&self.session_id).await
1164 }
1165}
1166
1167pub struct ToolPrepareCall<'a> {
1169 pub tool_id: ToolId,
1170 pub pending: crate::sansio::PendingToolCall,
1171 pub context: &'a ToolPrepareContext,
1172}
1173
1174pub struct ToolCall<'a> {
1181 pub name: &'a str,
1182 pub args: &'a serde_json::Value,
1183 pub context: &'a ToolContext<'a>,
1184 pub progress: Option<&'a ProgressSender>,
1185}
1186
1187#[async_trait::async_trait]
1195pub trait ToolProvider: Send + Sync + 'static {
1196 fn tool_manifests(&self) -> Vec<ToolManifest>;
1197 fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1198 self.tool_manifests()
1199 .into_iter()
1200 .find(|manifest| manifest.name == name)
1201 }
1202 fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1203 self.tool_manifests()
1204 .into_iter()
1205 .find(|manifest| manifest.id == *id)
1206 }
1207 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1208 fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1209 let manifest = self.resolve_manifest_by_id(id)?;
1210 self.resolve_contract(&manifest.name)
1211 }
1212 async fn prepare_tool_call(
1213 &self,
1214 call: ToolPrepareCall<'_>,
1215 ) -> Result<PreparedToolCall, ToolResult> {
1216 Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1217 }
1218 async fn prepare_granted_tool_call(
1219 &self,
1220 grant: &ToolExecutionGrant,
1221 call: ToolPrepareCall<'_>,
1222 ) -> Result<PreparedToolCall, ToolResult> {
1223 let _ = call;
1224 Err(ToolResult::err_fmt(format_args!(
1225 "Granted execution is unsupported for tool id `{}`",
1226 grant.manifest.id
1227 )))
1228 }
1229 async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1230 async fn execute_granted(
1231 &self,
1232 grant: &ToolExecutionGrant,
1233 args: &serde_json::Value,
1234 context: &ToolContext<'_>,
1235 progress: Option<&ProgressSender>,
1236 ) -> ToolResult {
1237 let _ = (args, context, progress);
1238 ToolResult::err_fmt(format_args!(
1239 "Granted execution is unsupported for tool id `{}`",
1240 grant.manifest.id
1241 ))
1242 }
1243 async fn execute_by_id(
1244 &self,
1245 tool_id: &ToolId,
1246 args: &serde_json::Value,
1247 context: &ToolContext<'_>,
1248 progress: Option<&ProgressSender>,
1249 ) -> ToolResult {
1250 let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1251 return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1252 };
1253 self.execute(ToolCall {
1254 name: &manifest.name,
1255 args,
1256 context,
1257 progress,
1258 })
1259 .await
1260 }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use super::*;
1266 use crate::ProcessRegistry;
1267 use crate::RuntimeEffectController;
1268 use std::sync::atomic::{AtomicU64, Ordering};
1269
1270 struct NoDurableEffectController;
1271
1272 #[async_trait::async_trait]
1273 impl crate::RuntimeEffectController for NoDurableEffectController {
1274 async fn execute_effect(
1275 &self,
1276 _envelope: crate::RuntimeEffectEnvelope,
1277 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1278 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1279 Err(crate::RuntimeEffectControllerError::new(
1280 "unexpected_effect",
1281 "test controller should not execute effects",
1282 ))
1283 }
1284 }
1285
1286 fn test_context_with_controller(
1287 tool_call_id: Option<String>,
1288 controller: Arc<dyn crate::RuntimeEffectController>,
1289 ) -> ToolContext<'static> {
1290 ToolContext::builder(
1291 "session-1".to_string(),
1292 Arc::new(crate::testing::MockSessionManager::default()),
1293 Arc::new(crate::testing::MockSessionManager::default()),
1294 Arc::new(crate::testing::MockSessionManager::default()),
1295 Arc::new(crate::UnavailableProcessService),
1296 Arc::new(crate::DefaultProcessCancelAbility),
1297 crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1298 Arc::new(crate::InMemoryAttachmentStore::new()),
1299 crate::DirectCompletionClient::unavailable(
1300 "direct completions are unavailable in this test context",
1301 ),
1302 )
1303 .tool_call_id(tool_call_id)
1304 .build()
1305 }
1306
1307 #[test]
1308 fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1309 let cancellation = tokio_util::sync::CancellationToken::new();
1310 let prepared = PreparedToolCall::from_parts(
1311 "call-1",
1312 "tool:demo_tool",
1313 "demo_tool",
1314 serde_json::json!({ "input": true }),
1315 None,
1316 serde_json::json!({ "prepared": true }),
1317 );
1318
1319 let context = ToolContext::builder(
1320 "session-1".to_string(),
1321 Arc::new(crate::testing::MockSessionManager::default()),
1322 Arc::new(crate::testing::MockSessionManager::default()),
1323 Arc::new(crate::testing::MockSessionManager::default()),
1324 Arc::new(crate::UnavailableProcessService),
1325 Arc::new(crate::DefaultProcessCancelAbility),
1326 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1327 crate::InlineRuntimeEffectController,
1328 )),
1329 Arc::new(crate::InMemoryAttachmentStore::new()),
1330 crate::DirectCompletionClient::unavailable(
1331 "direct completions are unavailable in this test context",
1332 ),
1333 )
1334 .prepared_call(&prepared)
1335 .cancellation_token(Some(cancellation.clone()))
1336 .async_process("process-1", cancellation.clone())
1337 .build();
1338
1339 assert_eq!(context.session_id(), "session-1");
1340 assert_eq!(context.tool_call_id(), Some("call-1"));
1341 assert_eq!(
1342 context.prepared_payload(),
1343 &serde_json::json!({ "prepared": true })
1344 );
1345 assert_eq!(context.async_process_id(), Some("process-1"));
1346 assert!(context.cancellation_token().is_some());
1347 }
1348
1349 #[test]
1350 fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1351 let missing_call =
1352 test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1353 let err = match missing_call.durable_effects() {
1354 Ok(_) => panic!("missing prepared tool call id should fail"),
1355 Err(err) => err,
1356 };
1357 assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1358
1359 let unsupported = test_context_with_controller(
1360 Some("call-1".to_string()),
1361 Arc::new(NoDurableEffectController),
1362 );
1363 let err = match unsupported.durable_effects() {
1364 Ok(_) => panic!("unsupported controller should fail"),
1365 Err(err) => err,
1366 };
1367 assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1368 }
1369
1370 #[tokio::test]
1371 async fn durable_run_json_executes_and_maps_closure_errors() {
1372 let context = test_context_with_controller(
1373 Some("call-run-json".to_string()),
1374 Arc::new(crate::InlineRuntimeEffectController),
1375 );
1376 let durable = context.durable_effects().expect("durable effects");
1377 let value = durable
1378 .run_json(
1379 "create",
1380 serde_json::json!({ "x": 1 }),
1381 |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1382 )
1383 .await
1384 .expect("durable step");
1385 assert_eq!(value, serde_json::json!({ "seen": 1 }));
1386
1387 let err = durable
1388 .run_json("fail", serde_json::json!({}), |_| async {
1389 Err(crate::RuntimeError::new(
1390 "durable_step_failed",
1391 "step failed",
1392 ))
1393 })
1394 .await
1395 .expect_err("closure error");
1396 assert_eq!(err.code.as_str(), "durable_step_failed");
1397 assert_eq!(err.message, "step failed");
1398 }
1399
1400 #[tokio::test]
1401 async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1402 let context = test_context_with_controller(
1403 Some("call-step-ids".to_string()),
1404 Arc::new(crate::InlineRuntimeEffectController),
1405 );
1406 let durable = context.durable_effects().expect("durable effects");
1407 let runs = Arc::new(AtomicU64::new(0));
1408
1409 let err = durable
1410 .run_json("", serde_json::Value::Null, {
1411 let runs = Arc::clone(&runs);
1412 move |_| async move {
1413 runs.fetch_add(1, Ordering::Relaxed);
1414 Ok(serde_json::Value::Null)
1415 }
1416 })
1417 .await
1418 .expect_err("empty step id");
1419 assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1420 assert_eq!(runs.load(Ordering::Relaxed), 0);
1421
1422 durable
1423 .run_json("same", serde_json::Value::Null, {
1424 let runs = Arc::clone(&runs);
1425 move |_| async move {
1426 runs.fetch_add(1, Ordering::Relaxed);
1427 Ok(serde_json::Value::Null)
1428 }
1429 })
1430 .await
1431 .expect("first step");
1432 let err = durable
1433 .run_json("same", serde_json::Value::Null, {
1434 let runs = Arc::clone(&runs);
1435 move |_| async move {
1436 runs.fetch_add(1, Ordering::Relaxed);
1437 Ok(serde_json::Value::Null)
1438 }
1439 })
1440 .await
1441 .expect_err("duplicate step id");
1442 assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1443 assert_eq!(runs.load(Ordering::Relaxed), 1);
1444 }
1445
1446 #[tokio::test]
1447 async fn durable_external_event_key_is_custom_and_stable() {
1448 let context = test_context_with_controller(
1449 Some("call-event-key".to_string()),
1450 Arc::new(crate::InlineRuntimeEffectController),
1451 );
1452 let durable = context.durable_effects().expect("durable effects");
1453 let first = durable
1454 .external_event_key("tool-event-stable")
1455 .await
1456 .expect("first key");
1457 let second = durable
1458 .external_event_key("tool-event-stable")
1459 .await
1460 .expect("second key");
1461
1462 assert_eq!(first, second);
1463 assert_eq!(
1464 first.wait,
1465 crate::AwaitEventWaitIdentity::Custom {
1466 key: "tool-event-stable".to_string()
1467 }
1468 );
1469 }
1470
1471 #[tokio::test]
1472 async fn durable_await_event_json_maps_terminal_resolutions() {
1473 let controller = Arc::new(crate::InlineRuntimeEffectController);
1474 let context =
1475 test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1476 let durable = context.durable_effects().expect("durable effects");
1477
1478 let ok_key = durable
1479 .external_event_key("tool-event-ok")
1480 .await
1481 .expect("ok key");
1482 controller
1483 .resolve_await_event(
1484 &ok_key,
1485 crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1486 )
1487 .await
1488 .expect("resolve ok");
1489 let value = durable
1490 .await_event_json(ok_key)
1491 .await
1492 .expect("await ok value");
1493 assert_eq!(value, serde_json::json!({ "answer": 42 }));
1494
1495 let err_key = durable
1496 .external_event_key("tool-event-err")
1497 .await
1498 .expect("err key");
1499 controller
1500 .resolve_await_event(
1501 &err_key,
1502 crate::Resolution::Err(crate::ExternalCompletionError::new(
1503 "external_bad",
1504 "external failed",
1505 )),
1506 )
1507 .await
1508 .expect("resolve err");
1509 let err = durable
1510 .await_event_json(err_key)
1511 .await
1512 .expect_err("await err value");
1513 assert_eq!(err.code.as_str(), "external_bad");
1514
1515 let cancelled_key = durable
1516 .external_event_key("tool-event-cancelled")
1517 .await
1518 .expect("cancelled key");
1519 controller
1520 .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1521 .await
1522 .expect("resolve cancelled");
1523 let err = durable
1524 .await_event_json(cancelled_key)
1525 .await
1526 .expect_err("await cancelled value");
1527 assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1528
1529 let timeout_key = durable
1530 .external_event_key("tool-event-timeout")
1531 .await
1532 .expect("timeout key");
1533 controller
1534 .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1535 .await
1536 .expect("resolve timeout");
1537 let err = durable
1538 .await_event_json(timeout_key)
1539 .await
1540 .expect_err("await timeout value");
1541 assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1542 }
1543
1544 #[tokio::test]
1545 async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1546 let context = test_context_with_controller(
1547 Some("call-no-process".to_string()),
1548 Arc::new(crate::InlineRuntimeEffectController),
1549 );
1550 let err = context
1551 .durable_effects()
1552 .expect("durable effects")
1553 .emit_process_event("tool.event", serde_json::json!({}))
1554 .await
1555 .expect_err("outside process");
1556 assert_eq!(
1557 err.code.as_str(),
1558 "durable_effect_process_event_unavailable"
1559 );
1560
1561 let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1562 let process_id = "process:durable-tool-event";
1563 registry
1564 .register_process(
1565 crate::ProcessRegistration::new(
1566 process_id,
1567 crate::ProcessInput::External {
1568 metadata: serde_json::json!({}),
1569 },
1570 crate::ProcessProvenance::host(),
1571 )
1572 .with_extra_event_types([crate::ProcessEventType {
1573 name: "tool.event".to_string(),
1574 payload_schema: crate::LashSchema::any(),
1575 semantics: crate::ProcessEventSemanticsSpec::default(),
1576 }]),
1577 )
1578 .await
1579 .expect("register process");
1580 let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1581 let context = test_context_with_controller(
1582 Some("call-process-event".to_string()),
1583 Arc::new(crate::InlineRuntimeEffectController),
1584 )
1585 .with_process_events_for_testing(process_id, registry_dyn);
1586
1587 let event = context
1588 .durable_effects()
1589 .expect("durable effects")
1590 .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1591 .await
1592 .expect("process event");
1593 assert_eq!(event.process_id, process_id);
1594 assert_eq!(event.event_type, "tool.event");
1595 assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1596 assert_eq!(
1597 event.invocation.replay_key(),
1598 Some("tool:call-process-event:durable-process-event:0")
1599 );
1600
1601 let append_err = context
1602 .durable_effects()
1603 .expect("durable effects")
1604 .emit_process_event("undeclared.event", serde_json::json!({}))
1605 .await
1606 .expect_err("undeclared event type must fail the append");
1607 assert_eq!(
1608 append_err.code.as_str(),
1609 "durable_effect_process_event_append_failed"
1610 );
1611 }
1612}