1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10 PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolDefinition, ToolId, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33 pub text: String,
34 pub kind: String,
36}
37
38pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43 key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47 fn store(
48 &self,
49 key: crate::AwaitEventKey,
50 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51 let mut guard = self.key.lock().map_err(|_| {
52 crate::RuntimeError::new(
53 "tool_completion_state_poisoned",
54 "tool completion key state lock poisoned",
55 )
56 })?;
57 if let Some(existing) = guard.as_ref() {
58 return Ok(existing.clone());
59 }
60 *guard = Some(key.clone());
61 Ok(key)
62 }
63
64 pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65 self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66 crate::RuntimeError::new(
67 "tool_completion_state_poisoned",
68 "tool completion key state lock poisoned",
69 )
70 })
71 }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76 step_ids: Arc<Mutex<HashSet<String>>>,
77 process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81 fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82 let mut guard = self.step_ids.lock().map_err(|_| {
83 crate::RuntimeError::new(
84 "durable_effect_state_poisoned",
85 "durable effect step state lock poisoned",
86 )
87 })?;
88 if !guard.insert(step_id.to_string()) {
89 return Err(crate::RuntimeError::new(
90 "durable_effect_duplicate_step_id",
91 format!("durable effect step id `{step_id}` was already used by this tool call"),
92 ));
93 }
94 Ok(())
95 }
96
97 fn next_process_event_sequence(&self) -> u64 {
98 self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99 }
100}
101
102#[derive(Clone)]
105pub struct ToolContext<'run> {
106 pub(crate) session_id: String,
107 pub(crate) agent_frame_id: crate::AgentFrameId,
108 pub(crate) sessions: Arc<dyn SessionStateService>,
109 pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110 pub(crate) processes: Arc<dyn crate::ProcessService>,
111 pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113 pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114 pub(crate) runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
115 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
116 pub(crate) async_process_id: Option<String>,
117 pub(crate) runtime_process_id: Option<String>,
118 pub(crate) process_events: Option<ToolProcessEventContext>,
119 pub(crate) attachment_store: Arc<dyn AttachmentStore>,
120 pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
121 pub(crate) prepared_payload: serde_json::Value,
122 pub(crate) tool_execution_binding: serde_json::Value,
123 pub(crate) tool_call_id: Option<String>,
125 pub(crate) attempt_number: u32,
126 pub(crate) max_attempts: u32,
127 pub(crate) replay_key: Option<String>,
128 pub(crate) completion: ToolCompletionState,
129 pub(crate) durable_effects: ToolDurableEffectState,
130 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
131 pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
132 pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
133}
134
135#[derive(Clone)]
136pub struct ToolChildProcessStarted {
137 pub process_id: String,
138 pub child_entry_name: Option<String>,
139}
140
141#[derive(Clone)]
142pub struct ToolChildExecutionTraceHook {
143 on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
144}
145
146impl ToolChildExecutionTraceHook {
147 pub fn new(
148 on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
149 ) -> Self {
150 Self {
151 on_child_process_started: Arc::new(on_child_process_started),
152 }
153 }
154
155 pub fn child_process_started(&self, event: ToolChildProcessStarted) {
156 (self.on_child_process_started)(event);
157 }
158}
159
160#[derive(Clone)]
161pub(crate) struct ToolProcessEventContext {
162 process_id: String,
163 registry: Arc<dyn crate::ProcessRegistry>,
164 awaiter: crate::ProcessAwaiter,
165 store: Option<Arc<dyn crate::RuntimePersistence>>,
166 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
167 session_graph: Arc<dyn SessionGraphService>,
168 queued_work_driver: Option<crate::QueuedWorkDriver>,
169}
170
171pub(crate) struct ToolContextBuilder<'run> {
172 session_id: String,
173 agent_frame_id: crate::AgentFrameId,
174 sessions: Arc<dyn SessionStateService>,
175 session_lifecycle: Arc<dyn SessionLifecycleService>,
176 session_graph: Arc<dyn SessionGraphService>,
177 processes: Arc<dyn crate::ProcessService>,
178 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
179 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
180 runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
181 runtime_execution_context: Option<crate::RuntimeExecutionContext<'run>>,
182 cancellation_token: Option<tokio_util::sync::CancellationToken>,
183 async_process_id: Option<String>,
184 runtime_process_id: Option<String>,
185 process_events: Option<ToolProcessEventContext>,
186 attachment_store: Arc<dyn AttachmentStore>,
187 direct_completions: crate::DirectCompletionClient<'run>,
188 prepared_payload: serde_json::Value,
189 tool_execution_binding: serde_json::Value,
190 tool_call_id: Option<String>,
191 completion: ToolCompletionState,
192 durable_effects: ToolDurableEffectState,
193 parent_invocation: Option<crate::RuntimeInvocation>,
194 execution_env_spec: crate::ProcessExecutionEnvSpec,
195 child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
196}
197
198impl<'run> ToolContextBuilder<'run> {
199 pub(crate) fn from_dispatch(
200 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
201 ) -> Self {
202 Self {
203 session_id: dispatch.session_id.clone(),
204 agent_frame_id: dispatch.agent_frame_id.clone(),
205 sessions: Arc::clone(&dispatch.sessions),
206 session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
207 session_graph: Arc::clone(&dispatch.session_graph),
208 processes: Arc::clone(&dispatch.processes),
209 process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
210 effect_controller: dispatch.effect_controller.clone(),
211 runtime_dispatch: Some(Arc::clone(&dispatch)),
212 runtime_execution_context: None,
213 cancellation_token: None,
214 async_process_id: None,
215 runtime_process_id: None,
216 process_events: None,
217 attachment_store: Arc::clone(&dispatch.attachment_store),
218 direct_completions: dispatch.direct_completions.clone(),
219 prepared_payload: serde_json::Value::Null,
220 tool_execution_binding: serde_json::Value::Null,
221 tool_call_id: None,
222 completion: ToolCompletionState::default(),
223 durable_effects: ToolDurableEffectState::default(),
224 parent_invocation: dispatch.parent_invocation.clone(),
225 execution_env_spec: dispatch.execution_env_spec.clone(),
226 child_execution_trace_hook: None,
227 }
228 }
229
230 #[cfg(any(test, feature = "testing"))]
231 pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
232 self.tool_call_id = tool_call_id.into();
233 self
234 }
235
236 pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
237 self.tool_call_id = Some(call.call_id.clone());
238 self.prepared_payload = call.prepared_payload.clone();
239 self
240 }
241
242 pub(crate) fn tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
243 self.tool_execution_binding = binding;
244 self
245 }
246
247 pub(crate) fn cancellation_token(
248 mut self,
249 cancellation_token: Option<tokio_util::sync::CancellationToken>,
250 ) -> Self {
251 self.cancellation_token = cancellation_token;
252 self
253 }
254
255 pub(crate) fn runtime_execution_context(
256 mut self,
257 context: crate::RuntimeExecutionContext<'run>,
258 ) -> Self {
259 self.runtime_execution_context = Some(context);
260 self
261 }
262
263 pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
264 self.runtime_process_id = process_id;
265 self
266 }
267
268 pub(crate) fn async_process(
269 mut self,
270 process_id: impl Into<String>,
271 cancellation_token: tokio_util::sync::CancellationToken,
272 ) -> Self {
273 self.async_process_id = Some(process_id.into());
274 self.cancellation_token = Some(cancellation_token);
275 self
276 }
277
278 pub(crate) fn process_events(
279 mut self,
280 process_id: impl Into<String>,
281 registry: Arc<dyn crate::ProcessRegistry>,
282 awaiter: crate::ProcessAwaiter,
283 store: Option<Arc<dyn crate::RuntimePersistence>>,
284 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
285 queued_work_driver: Option<crate::QueuedWorkDriver>,
286 ) -> Self {
287 self.process_events = Some(ToolProcessEventContext {
288 process_id: process_id.into(),
289 registry,
290 awaiter,
291 store,
292 session_store_factory,
293 session_graph: Arc::clone(&self.session_graph),
294 queued_work_driver,
295 });
296 self
297 }
298
299 pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
300 self.parent_invocation = metadata;
301 self
302 }
303
304 pub(crate) fn child_execution_trace_hook(
305 mut self,
306 hook: Option<ToolChildExecutionTraceHook>,
307 ) -> Self {
308 self.child_execution_trace_hook = hook;
309 self
310 }
311
312 pub(crate) fn build(self) -> ToolContext<'run> {
313 ToolContext {
314 session_id: self.session_id,
315 agent_frame_id: self.agent_frame_id,
316 sessions: self.sessions,
317 session_lifecycle: self.session_lifecycle,
318 processes: self.processes,
319 process_cancel_ability: self.process_cancel_ability,
320 effect_controller: self.effect_controller,
321 runtime_dispatch: self.runtime_dispatch,
322 runtime_execution_context: self.runtime_execution_context,
323 cancellation_token: self.cancellation_token,
324 async_process_id: self.async_process_id,
325 runtime_process_id: self.runtime_process_id,
326 process_events: self.process_events,
327 attachment_store: self.attachment_store,
328 direct_completions: self.direct_completions,
329 prepared_payload: self.prepared_payload,
330 tool_execution_binding: self.tool_execution_binding,
331 tool_call_id: self.tool_call_id,
332 attempt_number: 1,
333 max_attempts: 1,
334 replay_key: None,
335 completion: self.completion,
336 durable_effects: self.durable_effects,
337 parent_invocation: self.parent_invocation,
338 execution_env_spec: self.execution_env_spec,
339 child_execution_trace_hook: self.child_execution_trace_hook,
340 }
341 }
342}
343
344impl<'run> ToolContext<'run> {
345 #[cfg(any(test, feature = "testing"))]
346 #[expect(
347 clippy::too_many_arguments,
348 reason = "testing constructor mirrors the sealed runtime tool context dependencies"
349 )]
350 pub(crate) fn builder(
351 session_id: String,
352 sessions: Arc<dyn SessionStateService>,
353 session_lifecycle: Arc<dyn SessionLifecycleService>,
354 session_graph: Arc<dyn SessionGraphService>,
355 processes: Arc<dyn crate::ProcessService>,
356 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
357 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
358 attachment_store: Arc<dyn AttachmentStore>,
359 direct_completions: crate::DirectCompletionClient<'run>,
360 ) -> ToolContextBuilder<'run> {
361 ToolContextBuilder {
362 session_id,
363 agent_frame_id: String::new(),
364 sessions,
365 session_lifecycle,
366 session_graph,
367 processes,
368 process_cancel_ability,
369 effect_controller,
370 runtime_dispatch: None,
371 runtime_execution_context: None,
372 cancellation_token: None,
373 async_process_id: None,
374 runtime_process_id: None,
375 process_events: None,
376 attachment_store,
377 direct_completions,
378 prepared_payload: serde_json::Value::Null,
379 tool_execution_binding: serde_json::Value::Null,
380 tool_call_id: None,
381 completion: ToolCompletionState::default(),
382 durable_effects: ToolDurableEffectState::default(),
383 parent_invocation: None,
384 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
385 crate::PluginOptions::default(),
386 crate::SessionPolicy::default(),
387 ),
388 child_execution_trace_hook: None,
389 }
390 }
391
392 pub(crate) fn from_dispatch(
393 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
394 ) -> ToolContextBuilder<'run> {
395 ToolContextBuilder::from_dispatch(dispatch)
396 }
397
398 pub fn session_id(&self) -> &str {
399 &self.session_id
400 }
401
402 pub fn agent_frame_id(&self) -> &str {
403 &self.agent_frame_id
404 }
405
406 pub fn sessions(&self) -> ToolSessionAdmin<'run> {
407 ToolSessionAdmin {
408 session_id: self.session_id.clone(),
409 sessions: Arc::clone(&self.sessions),
410 session_lifecycle: Arc::clone(&self.session_lifecycle),
411 effect_controller: self.effect_controller.clone(),
412 }
413 }
414
415 pub fn dispatch(&self) -> ToolDispatchClient<'run> {
416 ToolDispatchClient {
417 context: self.clone(),
418 }
419 }
420
421 pub fn triggers(&self) -> ToolTriggerClient<'run> {
422 ToolTriggerClient {
423 context: self.clone(),
424 }
425 }
426
427 pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
428 ToolSessionProcessAdmin {
429 session_id: self.session_id.clone(),
430 agent_frame_id: self.agent_frame_id.clone(),
431 processes: Arc::clone(&self.processes),
432 process_cancel_ability: Arc::clone(&self.process_cancel_ability),
433 effect_controller: self.effect_controller.clone(),
434 parent_invocation: self.parent_invocation.clone(),
435 tool_call_id: self.tool_call_id.clone(),
436 execution_env_spec: self.execution_env_spec.clone(),
437 }
438 }
439
440 pub fn emit_child_process_started(
441 &self,
442 process_id: impl Into<String>,
443 child_entry_name: Option<String>,
444 ) {
445 let Some(hook) = &self.child_execution_trace_hook else {
446 return;
447 };
448 hook.child_process_started(ToolChildProcessStarted {
449 process_id: process_id.into(),
450 child_entry_name,
451 });
452 }
453
454 pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
455 ToolDirectCompletionClient {
456 session_id: self.session_id.clone(),
457 tool_call_id: self.tool_call_id.clone(),
458 direct_completions: self.direct_completions.clone(),
459 parent_invocation: self.parent_invocation.clone(),
460 parent_tool_attempt_is_durable: self.parent_invocation.as_ref().is_some_and(
461 |invocation| {
462 invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
463 },
464 ) && self
465 .effect_controller
466 .controller()
467 .durability_tier()
468 == crate::DurabilityTier::Durable,
469 }
470 }
471
472 pub fn attachments(&self) -> ToolAttachmentClient {
473 ToolAttachmentClient {
474 store: Arc::clone(&self.attachment_store),
475 }
476 }
477
478 pub fn process_events(&self) -> ToolProcessEventClient {
479 ToolProcessEventClient {
480 context: self.process_events.clone(),
481 }
482 }
483
484 pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
491 let Some(tool_call_id) = self.tool_call_id.as_deref() else {
492 return Err(crate::RuntimeError::new(
493 "durable_effects_missing_call_id",
494 "durable effects require a prepared tool call id",
495 ));
496 };
497 if tool_call_id.trim().is_empty() {
498 return Err(crate::RuntimeError::new(
499 "durable_effects_missing_call_id",
500 "durable effects require a non-empty prepared tool call id",
501 ));
502 }
503 let scoped = self.effect_controller.scoped();
504 if !scoped.controller().supports_durable_effects() {
505 return Err(crate::RuntimeError::new(
506 "durable_effects_unavailable",
507 "this effect controller does not support durable tool effects",
508 ));
509 }
510 if self.parent_invocation.as_ref().is_some_and(|invocation| {
511 invocation.effect_kind() == Some(crate::RuntimeEffectKind::ToolAttempt)
512 }) && scoped.controller().durability_tier() == crate::DurabilityTier::Durable
513 {
514 return Err(crate::RuntimeError::new(
515 "durable_effects_unavailable_in_tool_attempt",
516 "durable tool sub-effects are not available inside a journaled tool attempt",
517 ));
518 }
519 Ok(ToolDurableEffects { context: self })
520 }
521
522 pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
523 self.cancellation_token.as_ref()
524 }
525
526 pub fn async_process_id(&self) -> Option<&str> {
527 self.async_process_id.as_deref()
528 }
529
530 pub fn runtime_process_id(&self) -> Option<&str> {
531 self.async_process_id
532 .as_deref()
533 .or(self.runtime_process_id.as_deref())
534 .or_else(|| {
535 self.process_events
536 .as_ref()
537 .map(|context| context.process_id.as_str())
538 })
539 }
540
541 pub fn tool_call_id(&self) -> Option<&str> {
542 self.tool_call_id.as_deref()
543 }
544
545 pub fn prepared_payload(&self) -> &serde_json::Value {
546 &self.prepared_payload
547 }
548
549 pub fn tool_execution_binding(&self) -> &serde_json::Value {
550 &self.tool_execution_binding
551 }
552
553 pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
554 where
555 T: serde::de::DeserializeOwned,
556 {
557 serde_json::from_value(self.prepared_payload.clone())
558 }
559
560 pub fn attempt_number(&self) -> u32 {
561 self.attempt_number
562 }
563
564 pub fn max_attempts(&self) -> u32 {
565 self.max_attempts
566 }
567
568 pub fn replay_key(&self) -> Option<&str> {
569 self.replay_key.as_deref()
570 }
571
572 pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
586 let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
587 crate::RuntimeError::new(
588 "tool_completion_key_missing_call_id",
589 "completion keys require a prepared tool call id",
590 )
591 })?;
592 let scoped = self.effect_controller.scoped();
593 let key = scoped
594 .controller()
595 .await_event_key(
596 scoped.execution_scope(),
597 crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
598 )
599 .await?;
600 self.completion.store(key)
601 }
602
603 pub(crate) fn take_completion_key(
604 &self,
605 ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
606 self.completion.take()
607 }
608
609 pub fn with_async_process(
610 mut self,
611 process_id: impl Into<String>,
612 cancellation_token: tokio_util::sync::CancellationToken,
613 ) -> Self {
614 self.async_process_id = Some(process_id.into());
615 self.runtime_process_id = self.async_process_id.clone();
616 self.cancellation_token = Some(cancellation_token);
617 self
618 }
619
620 #[cfg(any(test, feature = "testing"))]
621 #[doc(hidden)]
622 pub fn with_process_events_for_testing(
623 mut self,
624 process_id: impl Into<String>,
625 registry: Arc<dyn crate::ProcessRegistry>,
626 ) -> Self {
627 let awaiter = crate::ProcessAwaiter::polling(Arc::clone(®istry));
628 self.process_events = Some(ToolProcessEventContext {
629 process_id: process_id.into(),
630 registry,
631 awaiter,
632 store: None,
633 session_store_factory: None,
634 session_graph: Arc::new(crate::plugin::NoopSessionManager),
635 queued_work_driver: None,
636 });
637 self
638 }
639
640 pub(crate) fn with_retry_context(
641 mut self,
642 tool_name: &str,
643 attempt_number: u32,
644 max_attempts: u32,
645 ) -> Self {
646 self.attempt_number = attempt_number.max(1);
647 self.max_attempts = max_attempts.max(1);
648 self.replay_key = self
649 .tool_call_id
650 .as_ref()
651 .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
652 self
653 }
654
655 pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
656 self.prepared_payload = payload;
657 self
658 }
659
660 pub(crate) fn with_tool_execution_binding(mut self, binding: serde_json::Value) -> Self {
661 self.tool_execution_binding = binding;
662 self
663 }
664
665 #[cfg(any(test, feature = "testing"))]
668 #[doc(hidden)]
669 #[expect(
670 clippy::too_many_arguments,
671 reason = "test-only constructor mirrors the sealed runtime tool context"
672 )]
673 pub fn __for_testing(
674 session_id: String,
675 sessions: Arc<dyn SessionStateService>,
676 session_lifecycle: Arc<dyn SessionLifecycleService>,
677 session_graph: Arc<dyn SessionGraphService>,
678 processes: Arc<dyn crate::ProcessService>,
679 attachment_store: Arc<dyn AttachmentStore>,
680 direct_completions: crate::DirectCompletionClient<'static>,
681 tool_call_id: Option<String>,
682 ) -> ToolContext<'static> {
683 ToolContext::builder(
684 session_id,
685 sessions,
686 session_lifecycle,
687 session_graph,
688 processes,
689 Arc::new(crate::DefaultProcessCancelAbility),
690 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
691 crate::InlineRuntimeEffectController,
692 )),
693 attachment_store,
694 direct_completions,
695 )
696 .tool_call_id(tool_call_id)
697 .build()
698 }
699
700 #[cfg(any(test, feature = "testing"))]
704 #[doc(hidden)]
705 #[expect(
706 clippy::too_many_arguments,
707 reason = "test-only constructor mirrors the sealed runtime context"
708 )]
709 pub fn __for_testing_with_process_cancel_ability(
710 session_id: String,
711 sessions: Arc<dyn SessionStateService>,
712 session_lifecycle: Arc<dyn SessionLifecycleService>,
713 session_graph: Arc<dyn SessionGraphService>,
714 processes: Arc<dyn crate::ProcessService>,
715 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
716 attachment_store: Arc<dyn AttachmentStore>,
717 direct_completions: crate::DirectCompletionClient<'static>,
718 tool_call_id: Option<String>,
719 ) -> ToolContext<'static> {
720 ToolContext::builder(
721 session_id,
722 sessions,
723 session_lifecycle,
724 session_graph,
725 processes,
726 process_cancel_ability,
727 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
728 crate::InlineRuntimeEffectController,
729 )),
730 attachment_store,
731 direct_completions,
732 )
733 .tool_call_id(tool_call_id)
734 .build()
735 }
736}
737
738pub struct ToolDurableEffects<'ctx, 'run> {
744 context: &'ctx ToolContext<'run>,
745}
746
747impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
748 pub async fn run_json<F, Fut>(
749 &self,
750 step_id: impl Into<String>,
751 input: serde_json::Value,
752 run: F,
753 ) -> Result<serde_json::Value, crate::RuntimeError>
754 where
755 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
756 Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
757 {
758 let step_id = step_id.into();
759 if step_id.trim().is_empty() {
760 return Err(crate::RuntimeError::new(
761 "durable_effect_empty_step_id",
762 "durable effect step id must be non-empty",
763 ));
764 }
765 self.context.durable_effects.reserve_step(&step_id)?;
766 let invocation = self.step_invocation(
767 format!("durable-step:{step_id}"),
768 crate::RuntimeEffectKind::DurableStep,
769 format!("durable-step:{step_id}"),
770 )?;
771 let outcome = self
772 .context
773 .effect_controller
774 .controller()
775 .execute_effect(
776 crate::RuntimeEffectEnvelope::new(
777 invocation,
778 crate::RuntimeEffectCommand::DurableStep {
779 step_id,
780 input: input.clone(),
781 },
782 ),
783 crate::RuntimeEffectLocalExecutor::durable_step(run),
784 )
785 .await
786 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
787 outcome
788 .into_durable_step()
789 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
790 }
791
792 pub async fn external_event_key(
793 &self,
794 key: impl Into<String>,
795 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
796 let key = key.into();
797 if key.trim().is_empty() {
798 return Err(crate::RuntimeError::new(
799 "durable_effect_empty_event_key",
800 "durable effect external event key must be non-empty",
801 ));
802 }
803 let scoped = self.context.effect_controller.scoped();
804 scoped
805 .controller()
806 .await_event_key(
807 scoped.execution_scope(),
808 crate::AwaitEventWaitIdentity::Custom { key },
809 )
810 .await
811 }
812
813 pub async fn await_event_json(
814 &self,
815 key: crate::AwaitEventKey,
816 ) -> Result<serde_json::Value, crate::RuntimeError> {
817 let invocation = self.step_invocation(
818 format!("await-event:{}", key.key_id),
819 crate::RuntimeEffectKind::AwaitEvent,
820 format!("await-event:{}", key.key_id),
821 )?;
822 let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
823 let clock = self
824 .context
825 .runtime_dispatch
826 .as_ref()
827 .map(|dispatch| Arc::clone(&dispatch.clock))
828 .unwrap_or_else(|| Arc::new(crate::SystemClock));
829 let outcome = self
830 .context
831 .effect_controller
832 .controller()
833 .execute_effect(
834 crate::RuntimeEffectEnvelope::new(
835 invocation,
836 crate::RuntimeEffectCommand::AwaitEvent { key },
837 ),
838 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
839 cancellation,
840 None,
841 clock,
842 ),
843 )
844 .await
845 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
846 match outcome
847 .into_await_event()
848 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
849 {
850 crate::Resolution::Ok(value) => Ok(value),
851 crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
852 crate::Resolution::Timeout => Err(crate::RuntimeError::new(
853 "durable_effect_event_timeout",
854 "durable effect external event wait timed out",
855 )),
856 crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
857 "durable_effect_event_cancelled",
858 "durable effect external event wait was cancelled",
859 )),
860 }
861 }
862
863 pub async fn emit_process_event(
864 &self,
865 event_type: impl Into<String>,
866 payload: serde_json::Value,
867 ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
868 let Some(process) = self.context.process_events.as_ref() else {
869 return Err(crate::RuntimeError::new(
870 "durable_effect_process_event_unavailable",
871 "durable effect process events are unavailable outside a durable process",
872 ));
873 };
874 let event_type = event_type.into();
875 if event_type.trim().is_empty() {
876 return Err(crate::RuntimeError::new(
877 "durable_effect_empty_process_event_type",
878 "durable effect process event type must be non-empty",
879 ));
880 }
881 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
882 crate::RuntimeError::new(
883 "durable_effects_missing_call_id",
884 "durable effects require a prepared tool call id",
885 )
886 })?;
887 let sequence = self.context.durable_effects.next_process_event_sequence();
888 let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
889 format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
890 );
891 self.context
892 .process_events()
893 .emit_request(request)
894 .await
895 .map_err(|err| {
896 crate::RuntimeError::new(
897 "durable_effect_process_event_append_failed",
898 err.to_string(),
899 )
900 })
901 .and_then(|event| {
902 if event.process_id == process.process_id {
903 Ok(event)
904 } else {
905 Err(crate::RuntimeError::new(
906 "durable_effect_process_event_process_mismatch",
907 "process event append returned an event for a different process",
908 ))
909 }
910 })
911 }
912
913 fn step_invocation(
914 &self,
915 effect_id_suffix: impl Into<String>,
916 kind: crate::RuntimeEffectKind,
917 replay_suffix: impl AsRef<str>,
918 ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
919 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
920 crate::RuntimeError::new(
921 "durable_effects_missing_call_id",
922 "durable effects require a prepared tool call id",
923 )
924 })?;
925 let effect_id_suffix = effect_id_suffix.into();
926 if let Some(parent) = self.context.parent_invocation.as_ref() {
927 return Ok(crate::runtime::causal::child_effect_invocation(
928 parent,
929 format!("{tool_call_id}:{effect_id_suffix}"),
930 kind,
931 replay_suffix,
932 ));
933 }
934 let scoped = self.context.effect_controller.scoped();
935 let replay_key = format!(
936 "{}:tool:{tool_call_id}:{}",
937 scoped.scope_id(),
938 replay_suffix.as_ref()
939 );
940 Ok(crate::RuntimeInvocation::effect(
941 crate::RuntimeScope::new(self.context.session_id.clone()),
942 format!("{tool_call_id}:{effect_id_suffix}"),
943 kind,
944 replay_key,
945 ))
946 }
947}
948
949#[derive(Clone, Debug, Serialize, Deserialize)]
955pub struct PreparedToolCall {
956 pub call_id: String,
957 pub tool_id: ToolId,
958 pub tool_name: String,
959 pub args: serde_json::Value,
960 #[serde(default, skip_serializing_if = "Option::is_none")]
961 pub replay: Option<ProviderReplayMeta>,
962 #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
963 pub prepared_payload: serde_json::Value,
964}
965
966impl PreparedToolCall {
967 pub fn identity(tool_id: ToolId, call: crate::sansio::PendingToolCall) -> Self {
968 Self {
969 call_id: call.call_id,
970 tool_id,
971 tool_name: call.tool_name,
972 args: call.args,
973 replay: call.replay,
974 prepared_payload: serde_json::Value::Null,
975 }
976 }
977
978 pub fn from_parts(
979 call_id: impl Into<String>,
980 tool_id: impl Into<ToolId>,
981 tool_name: impl Into<String>,
982 args: serde_json::Value,
983 replay: Option<ProviderReplayMeta>,
984 prepared_payload: serde_json::Value,
985 ) -> Self {
986 Self {
987 call_id: call_id.into(),
988 tool_id: tool_id.into(),
989 tool_name: tool_name.into(),
990 args,
991 replay,
992 prepared_payload,
993 }
994 }
995}
996
997#[derive(Clone, Debug, Serialize, Deserialize)]
1003pub struct PreparedToolBatchCall {
1004 pub call: PreparedToolCall,
1005 pub replay_suffix: String,
1006 #[serde(default, skip_serializing_if = "Option::is_none")]
1007 pub execution_grant: Option<Box<ToolExecutionGrant>>,
1008}
1009
1010#[derive(Clone, Debug, Serialize, Deserialize)]
1016pub struct PreparedToolBatch {
1017 pub batch_id: String,
1018 pub calls: Vec<PreparedToolBatchCall>,
1019}
1020
1021impl PreparedToolBatch {
1022 pub fn new(batch_id: impl Into<String>, calls: Vec<PreparedToolCall>) -> Self {
1023 let batch_id = batch_id.into();
1024 let calls = calls
1025 .into_iter()
1026 .enumerate()
1027 .map(|(index, call)| PreparedToolBatchCall {
1028 replay_suffix: format!("child:{index}:{}", call.call_id),
1029 call,
1030 execution_grant: None,
1031 })
1032 .collect();
1033 Self { batch_id, calls }
1034 }
1035
1036 pub fn new_with_grants(
1037 batch_id: impl Into<String>,
1038 calls: Vec<(PreparedToolCall, Option<ToolExecutionGrant>)>,
1039 ) -> Self {
1040 let batch_id = batch_id.into();
1041 let calls = calls
1042 .into_iter()
1043 .enumerate()
1044 .map(|(index, (call, execution_grant))| PreparedToolBatchCall {
1045 replay_suffix: format!("child:{index}:{}", call.call_id),
1046 call,
1047 execution_grant: execution_grant.map(Box::new),
1048 })
1049 .collect();
1050 Self { batch_id, calls }
1051 }
1052
1053 pub fn is_empty(&self) -> bool {
1054 self.calls.is_empty()
1055 }
1056
1057 pub fn len(&self) -> usize {
1058 self.calls.len()
1059 }
1060}
1061
1062#[derive(Clone, Debug, Serialize, Deserialize)]
1070pub struct ToolExecutionGrant {
1071 pub manifest: ToolManifest,
1073 pub contract: Box<ToolContract>,
1076 pub source_id: Option<String>,
1079 pub execution_binding: serde_json::Value,
1081}
1082
1083impl ToolExecutionGrant {
1084 pub fn new(manifest: ToolManifest, contract: ToolContract) -> Self {
1085 Self {
1086 manifest,
1087 contract: Box::new(contract),
1088 source_id: None,
1089 execution_binding: serde_json::Value::Null,
1090 }
1091 }
1092
1093 pub fn from_definition(definition: ToolDefinition) -> Self {
1094 Self::new(definition.manifest(), definition.contract())
1095 }
1096
1097 pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
1098 self.source_id = Some(source_id.into());
1099 self
1100 }
1101
1102 pub fn with_execution_binding(mut self, execution_binding: serde_json::Value) -> Self {
1103 self.execution_binding = execution_binding;
1104 self
1105 }
1106}
1107
1108#[derive(Clone)]
1109pub struct ToolPrepareContext {
1110 session_id: String,
1111 sessions: Arc<dyn SessionStateService>,
1112 turn_context: crate::TurnContext,
1113 tool_call_id: Option<String>,
1114 tool_execution_binding: serde_json::Value,
1115}
1116
1117impl ToolPrepareContext {
1118 pub(crate) fn with_execution_binding(
1119 session_id: String,
1120 sessions: Arc<dyn SessionStateService>,
1121 turn_context: crate::TurnContext,
1122 tool_call_id: Option<String>,
1123 tool_execution_binding: serde_json::Value,
1124 ) -> Self {
1125 Self {
1126 session_id,
1127 sessions,
1128 turn_context,
1129 tool_call_id,
1130 tool_execution_binding,
1131 }
1132 }
1133
1134 pub fn session_id(&self) -> &str {
1135 &self.session_id
1136 }
1137
1138 pub fn tool_call_id(&self) -> Option<&str> {
1139 self.tool_call_id.as_deref()
1140 }
1141
1142 pub fn tool_execution_binding(&self) -> &serde_json::Value {
1143 &self.tool_execution_binding
1144 }
1145
1146 pub fn turn_context(&self) -> &crate::TurnContext {
1147 &self.turn_context
1148 }
1149
1150 pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1151 where
1152 T: 'static,
1153 {
1154 self.turn_context.plugin_input::<T>(plugin_id)
1155 }
1156
1157 pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1158 self.sessions.snapshot_session(&self.session_id).await
1159 }
1160
1161 pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1162 self.sessions.tool_catalog(&self.session_id).await
1163 }
1164
1165 pub async fn shared_tool_catalog(
1166 &self,
1167 ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1168 self.sessions.shared_tool_catalog(&self.session_id).await
1169 }
1170}
1171
1172pub struct ToolPrepareCall<'a> {
1174 pub tool_id: ToolId,
1175 pub pending: crate::sansio::PendingToolCall,
1176 pub context: &'a ToolPrepareContext,
1177}
1178
1179pub struct ToolCall<'a> {
1186 pub name: &'a str,
1187 pub args: &'a serde_json::Value,
1188 pub context: &'a ToolContext<'a>,
1189 pub progress: Option<&'a ProgressSender>,
1190}
1191
1192#[async_trait::async_trait]
1200pub trait ToolProvider: Send + Sync + 'static {
1201 fn tool_manifests(&self) -> Vec<ToolManifest>;
1202 fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1203 self.tool_manifests()
1204 .into_iter()
1205 .find(|manifest| manifest.name == name)
1206 }
1207 fn resolve_manifest_by_id(&self, id: &ToolId) -> Option<ToolManifest> {
1208 self.tool_manifests()
1209 .into_iter()
1210 .find(|manifest| manifest.id == *id)
1211 }
1212 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1213 fn resolve_contract_by_id(&self, id: &ToolId) -> Option<Arc<ToolContract>> {
1214 let manifest = self.resolve_manifest_by_id(id)?;
1215 self.resolve_contract(&manifest.name)
1216 }
1217 async fn prepare_tool_call(
1218 &self,
1219 call: ToolPrepareCall<'_>,
1220 ) -> Result<PreparedToolCall, ToolResult> {
1221 Ok(PreparedToolCall::identity(call.tool_id, call.pending))
1222 }
1223 async fn prepare_granted_tool_call(
1224 &self,
1225 grant: &ToolExecutionGrant,
1226 call: ToolPrepareCall<'_>,
1227 ) -> Result<PreparedToolCall, ToolResult> {
1228 let _ = call;
1229 Err(ToolResult::err_fmt(format_args!(
1230 "Granted execution is unsupported for tool id `{}`",
1231 grant.manifest.id
1232 )))
1233 }
1234 async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1235 async fn execute_granted(
1236 &self,
1237 grant: &ToolExecutionGrant,
1238 args: &serde_json::Value,
1239 context: &ToolContext<'_>,
1240 progress: Option<&ProgressSender>,
1241 ) -> ToolResult {
1242 let _ = (args, context, progress);
1243 ToolResult::err_fmt(format_args!(
1244 "Granted execution is unsupported for tool id `{}`",
1245 grant.manifest.id
1246 ))
1247 }
1248 async fn execute_by_id(
1249 &self,
1250 tool_id: &ToolId,
1251 args: &serde_json::Value,
1252 context: &ToolContext<'_>,
1253 progress: Option<&ProgressSender>,
1254 ) -> ToolResult {
1255 let Some(manifest) = self.resolve_manifest_by_id(tool_id) else {
1256 return ToolResult::err_fmt(format!("Unknown tool id: {tool_id}"));
1257 };
1258 self.execute(ToolCall {
1259 name: &manifest.name,
1260 args,
1261 context,
1262 progress,
1263 })
1264 .await
1265 }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270 use super::*;
1271 use crate::AwaitEventResolver;
1272 use crate::ProcessRegistry;
1273 use std::sync::atomic::{AtomicU64, Ordering};
1274
1275 struct NoDurableEffectController;
1276
1277 impl crate::AwaitEventResolver for NoDurableEffectController {}
1278
1279 #[async_trait::async_trait]
1280 impl crate::RuntimeEffectController for NoDurableEffectController {
1281 async fn execute_effect(
1282 &self,
1283 _envelope: crate::RuntimeEffectEnvelope,
1284 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1285 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1286 Err(crate::RuntimeEffectControllerError::new(
1287 "unexpected_effect",
1288 "test controller should not execute effects",
1289 ))
1290 }
1291 }
1292
1293 fn test_context_with_controller(
1294 tool_call_id: Option<String>,
1295 controller: Arc<dyn crate::RuntimeEffectController>,
1296 ) -> ToolContext<'static> {
1297 ToolContext::builder(
1298 "session-1".to_string(),
1299 Arc::new(crate::testing::MockSessionManager::default()),
1300 Arc::new(crate::testing::MockSessionManager::default()),
1301 Arc::new(crate::testing::MockSessionManager::default()),
1302 Arc::new(crate::UnavailableProcessService),
1303 Arc::new(crate::DefaultProcessCancelAbility),
1304 crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1305 Arc::new(crate::InMemoryAttachmentStore::new()),
1306 crate::DirectCompletionClient::unavailable(
1307 "direct completions are unavailable in this test context",
1308 ),
1309 )
1310 .tool_call_id(tool_call_id)
1311 .build()
1312 }
1313
1314 #[test]
1315 fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1316 let cancellation = tokio_util::sync::CancellationToken::new();
1317 let prepared = PreparedToolCall::from_parts(
1318 "call-1",
1319 "tool:demo_tool",
1320 "demo_tool",
1321 serde_json::json!({ "input": true }),
1322 None,
1323 serde_json::json!({ "prepared": true }),
1324 );
1325
1326 let context = ToolContext::builder(
1327 "session-1".to_string(),
1328 Arc::new(crate::testing::MockSessionManager::default()),
1329 Arc::new(crate::testing::MockSessionManager::default()),
1330 Arc::new(crate::testing::MockSessionManager::default()),
1331 Arc::new(crate::UnavailableProcessService),
1332 Arc::new(crate::DefaultProcessCancelAbility),
1333 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1334 crate::InlineRuntimeEffectController,
1335 )),
1336 Arc::new(crate::InMemoryAttachmentStore::new()),
1337 crate::DirectCompletionClient::unavailable(
1338 "direct completions are unavailable in this test context",
1339 ),
1340 )
1341 .prepared_call(&prepared)
1342 .cancellation_token(Some(cancellation.clone()))
1343 .async_process("process-1", cancellation.clone())
1344 .build();
1345
1346 assert_eq!(context.session_id(), "session-1");
1347 assert_eq!(context.tool_call_id(), Some("call-1"));
1348 assert_eq!(
1349 context.prepared_payload(),
1350 &serde_json::json!({ "prepared": true })
1351 );
1352 assert_eq!(context.async_process_id(), Some("process-1"));
1353 assert!(context.cancellation_token().is_some());
1354 }
1355
1356 #[test]
1357 fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1358 let missing_call =
1359 test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1360 let err = match missing_call.durable_effects() {
1361 Ok(_) => panic!("missing prepared tool call id should fail"),
1362 Err(err) => err,
1363 };
1364 assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1365
1366 let unsupported = test_context_with_controller(
1367 Some("call-1".to_string()),
1368 Arc::new(NoDurableEffectController),
1369 );
1370 let err = match unsupported.durable_effects() {
1371 Ok(_) => panic!("unsupported controller should fail"),
1372 Err(err) => err,
1373 };
1374 assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1375 }
1376
1377 #[tokio::test]
1378 async fn durable_run_json_executes_and_maps_closure_errors() {
1379 let context = test_context_with_controller(
1380 Some("call-run-json".to_string()),
1381 Arc::new(crate::InlineRuntimeEffectController),
1382 );
1383 let durable = context.durable_effects().expect("durable effects");
1384 let value = durable
1385 .run_json(
1386 "create",
1387 serde_json::json!({ "x": 1 }),
1388 |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1389 )
1390 .await
1391 .expect("durable step");
1392 assert_eq!(value, serde_json::json!({ "seen": 1 }));
1393
1394 let err = durable
1395 .run_json("fail", serde_json::json!({}), |_| async {
1396 Err(crate::RuntimeError::new(
1397 "durable_step_failed",
1398 "step failed",
1399 ))
1400 })
1401 .await
1402 .expect_err("closure error");
1403 assert_eq!(err.code.as_str(), "durable_step_failed");
1404 assert_eq!(err.message, "step failed");
1405 }
1406
1407 #[tokio::test]
1408 async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1409 let context = test_context_with_controller(
1410 Some("call-step-ids".to_string()),
1411 Arc::new(crate::InlineRuntimeEffectController),
1412 );
1413 let durable = context.durable_effects().expect("durable effects");
1414 let runs = Arc::new(AtomicU64::new(0));
1415
1416 let err = durable
1417 .run_json("", serde_json::Value::Null, {
1418 let runs = Arc::clone(&runs);
1419 move |_| async move {
1420 runs.fetch_add(1, Ordering::Relaxed);
1421 Ok(serde_json::Value::Null)
1422 }
1423 })
1424 .await
1425 .expect_err("empty step id");
1426 assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1427 assert_eq!(runs.load(Ordering::Relaxed), 0);
1428
1429 durable
1430 .run_json("same", serde_json::Value::Null, {
1431 let runs = Arc::clone(&runs);
1432 move |_| async move {
1433 runs.fetch_add(1, Ordering::Relaxed);
1434 Ok(serde_json::Value::Null)
1435 }
1436 })
1437 .await
1438 .expect("first step");
1439 let err = durable
1440 .run_json("same", serde_json::Value::Null, {
1441 let runs = Arc::clone(&runs);
1442 move |_| async move {
1443 runs.fetch_add(1, Ordering::Relaxed);
1444 Ok(serde_json::Value::Null)
1445 }
1446 })
1447 .await
1448 .expect_err("duplicate step id");
1449 assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1450 assert_eq!(runs.load(Ordering::Relaxed), 1);
1451 }
1452
1453 #[tokio::test]
1454 async fn durable_external_event_key_is_custom_and_stable() {
1455 let context = test_context_with_controller(
1456 Some("call-event-key".to_string()),
1457 Arc::new(crate::InlineRuntimeEffectController),
1458 );
1459 let durable = context.durable_effects().expect("durable effects");
1460 let first = durable
1461 .external_event_key("tool-event-stable")
1462 .await
1463 .expect("first key");
1464 let second = durable
1465 .external_event_key("tool-event-stable")
1466 .await
1467 .expect("second key");
1468
1469 assert_eq!(first, second);
1470 assert_eq!(
1471 first.wait,
1472 crate::AwaitEventWaitIdentity::Custom {
1473 key: "tool-event-stable".to_string()
1474 }
1475 );
1476 }
1477
1478 #[tokio::test]
1479 async fn durable_await_event_json_maps_terminal_resolutions() {
1480 let controller = Arc::new(crate::InlineRuntimeEffectController);
1481 let context =
1482 test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1483 let durable = context.durable_effects().expect("durable effects");
1484
1485 let ok_key = durable
1486 .external_event_key("tool-event-ok")
1487 .await
1488 .expect("ok key");
1489 controller
1490 .resolve_await_event(
1491 &ok_key,
1492 crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1493 )
1494 .await
1495 .expect("resolve ok");
1496 let value = durable
1497 .await_event_json(ok_key)
1498 .await
1499 .expect("await ok value");
1500 assert_eq!(value, serde_json::json!({ "answer": 42 }));
1501
1502 let err_key = durable
1503 .external_event_key("tool-event-err")
1504 .await
1505 .expect("err key");
1506 controller
1507 .resolve_await_event(
1508 &err_key,
1509 crate::Resolution::Err(crate::ExternalCompletionError::new(
1510 "external_bad",
1511 "external failed",
1512 )),
1513 )
1514 .await
1515 .expect("resolve err");
1516 let err = durable
1517 .await_event_json(err_key)
1518 .await
1519 .expect_err("await err value");
1520 assert_eq!(err.code.as_str(), "external_bad");
1521
1522 let cancelled_key = durable
1523 .external_event_key("tool-event-cancelled")
1524 .await
1525 .expect("cancelled key");
1526 controller
1527 .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1528 .await
1529 .expect("resolve cancelled");
1530 let err = durable
1531 .await_event_json(cancelled_key)
1532 .await
1533 .expect_err("await cancelled value");
1534 assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1535
1536 let timeout_key = durable
1537 .external_event_key("tool-event-timeout")
1538 .await
1539 .expect("timeout key");
1540 controller
1541 .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1542 .await
1543 .expect("resolve timeout");
1544 let err = durable
1545 .await_event_json(timeout_key)
1546 .await
1547 .expect_err("await timeout value");
1548 assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1549 }
1550
1551 #[tokio::test]
1552 async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1553 let context = test_context_with_controller(
1554 Some("call-no-process".to_string()),
1555 Arc::new(crate::InlineRuntimeEffectController),
1556 );
1557 let err = context
1558 .durable_effects()
1559 .expect("durable effects")
1560 .emit_process_event("tool.event", serde_json::json!({}))
1561 .await
1562 .expect_err("outside process");
1563 assert_eq!(
1564 err.code.as_str(),
1565 "durable_effect_process_event_unavailable"
1566 );
1567
1568 let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1569 let process_id = "process:durable-tool-event";
1570 registry
1571 .register_process(
1572 crate::ProcessRegistration::new(
1573 process_id,
1574 crate::ProcessInput::External {
1575 metadata: serde_json::json!({}),
1576 },
1577 crate::RecoveryDisposition::ExternallyOwned,
1578 crate::ProcessProvenance::host(),
1579 )
1580 .with_extra_event_types([crate::ProcessEventType {
1581 name: "tool.event".to_string(),
1582 payload_schema: crate::LashSchema::any(),
1583 semantics: crate::ProcessEventSemanticsSpec::default(),
1584 }]),
1585 )
1586 .await
1587 .expect("register process");
1588 let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1589 let context = test_context_with_controller(
1590 Some("call-process-event".to_string()),
1591 Arc::new(crate::InlineRuntimeEffectController),
1592 )
1593 .with_process_events_for_testing(process_id, registry_dyn);
1594
1595 let event = context
1596 .durable_effects()
1597 .expect("durable effects")
1598 .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1599 .await
1600 .expect("process event");
1601 assert_eq!(event.process_id, process_id);
1602 assert_eq!(event.event_type, "tool.event");
1603 assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1604 assert_eq!(
1605 event.invocation.replay_key(),
1606 Some("tool:call-process-event:durable-process-event:0")
1607 );
1608
1609 let append_err = context
1610 .durable_effects()
1611 .expect("durable effects")
1612 .emit_process_event("undeclared.event", serde_json::json!({}))
1613 .await
1614 .expect_err("undeclared event type must fail the append");
1615 assert_eq!(
1616 append_err.code.as_str(),
1617 "durable_effect_process_event_append_failed"
1618 );
1619 }
1620}