1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10 PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33 pub text: String,
34 pub kind: String,
36}
37
38pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43 key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47 fn store(
48 &self,
49 key: crate::AwaitEventKey,
50 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51 let mut guard = self.key.lock().map_err(|_| {
52 crate::RuntimeError::new(
53 "tool_completion_state_poisoned",
54 "tool completion key state lock poisoned",
55 )
56 })?;
57 if let Some(existing) = guard.as_ref() {
58 return Ok(existing.clone());
59 }
60 *guard = Some(key.clone());
61 Ok(key)
62 }
63
64 pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65 self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66 crate::RuntimeError::new(
67 "tool_completion_state_poisoned",
68 "tool completion key state lock poisoned",
69 )
70 })
71 }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76 step_ids: Arc<Mutex<HashSet<String>>>,
77 process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81 fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82 let mut guard = self.step_ids.lock().map_err(|_| {
83 crate::RuntimeError::new(
84 "durable_effect_state_poisoned",
85 "durable effect step state lock poisoned",
86 )
87 })?;
88 if !guard.insert(step_id.to_string()) {
89 return Err(crate::RuntimeError::new(
90 "durable_effect_duplicate_step_id",
91 format!("durable effect step id `{step_id}` was already used by this tool call"),
92 ));
93 }
94 Ok(())
95 }
96
97 fn next_process_event_sequence(&self) -> u64 {
98 self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99 }
100}
101
102#[derive(Clone)]
105pub struct ToolContext<'run> {
106 pub(crate) session_id: String,
107 pub(crate) agent_frame_id: crate::AgentFrameId,
108 pub(crate) sessions: Arc<dyn SessionStateService>,
109 pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110 pub(crate) processes: Arc<dyn crate::ProcessService>,
111 pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113 pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
115 pub(crate) async_process_id: Option<String>,
116 pub(crate) runtime_process_id: Option<String>,
117 pub(crate) process_events: Option<ToolProcessEventContext>,
118 pub(crate) attachment_store: Arc<dyn AttachmentStore>,
119 pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
120 pub(crate) prepared_payload: serde_json::Value,
121 pub(crate) tool_call_id: Option<String>,
123 pub(crate) attempt_number: u32,
124 pub(crate) max_attempts: u32,
125 pub(crate) replay_key: Option<String>,
126 pub(crate) completion: ToolCompletionState,
127 pub(crate) durable_effects: ToolDurableEffectState,
128 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
129 pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
130 pub(crate) child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
131}
132
133#[derive(Clone)]
134pub struct ToolChildProcessStarted {
135 pub process_id: String,
136 pub child_entry_name: Option<String>,
137}
138
139#[derive(Clone)]
140pub struct ToolChildExecutionTraceHook {
141 on_child_process_started: Arc<dyn Fn(ToolChildProcessStarted) + Send + Sync>,
142}
143
144impl ToolChildExecutionTraceHook {
145 pub fn new(
146 on_child_process_started: impl Fn(ToolChildProcessStarted) + Send + Sync + 'static,
147 ) -> Self {
148 Self {
149 on_child_process_started: Arc::new(on_child_process_started),
150 }
151 }
152
153 pub fn child_process_started(&self, event: ToolChildProcessStarted) {
154 (self.on_child_process_started)(event);
155 }
156}
157
158#[derive(Clone)]
159pub(crate) struct ToolProcessEventContext {
160 process_id: String,
161 registry: Arc<dyn crate::ProcessRegistry>,
162 store: Option<Arc<dyn crate::RuntimePersistence>>,
163 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
164 session_graph: Arc<dyn SessionGraphService>,
165 queued_work_poke: Option<crate::QueuedWorkPoke>,
166}
167
168pub(crate) struct ToolContextBuilder<'run> {
169 session_id: String,
170 agent_frame_id: crate::AgentFrameId,
171 sessions: Arc<dyn SessionStateService>,
172 session_lifecycle: Arc<dyn SessionLifecycleService>,
173 session_graph: Arc<dyn SessionGraphService>,
174 processes: Arc<dyn crate::ProcessService>,
175 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
176 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
177 runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
178 cancellation_token: Option<tokio_util::sync::CancellationToken>,
179 async_process_id: Option<String>,
180 runtime_process_id: Option<String>,
181 process_events: Option<ToolProcessEventContext>,
182 attachment_store: Arc<dyn AttachmentStore>,
183 direct_completions: crate::DirectCompletionClient<'run>,
184 prepared_payload: serde_json::Value,
185 tool_call_id: Option<String>,
186 completion: ToolCompletionState,
187 durable_effects: ToolDurableEffectState,
188 parent_invocation: Option<crate::RuntimeInvocation>,
189 execution_env_spec: crate::ProcessExecutionEnvSpec,
190 child_execution_trace_hook: Option<ToolChildExecutionTraceHook>,
191}
192
193impl<'run> ToolContextBuilder<'run> {
194 pub(crate) fn from_dispatch(
195 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
196 ) -> Self {
197 Self {
198 session_id: dispatch.session_id.clone(),
199 agent_frame_id: dispatch.agent_frame_id.clone(),
200 sessions: Arc::clone(&dispatch.sessions),
201 session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
202 session_graph: Arc::clone(&dispatch.session_graph),
203 processes: Arc::clone(&dispatch.processes),
204 process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
205 effect_controller: dispatch.effect_controller.clone(),
206 runtime_dispatch: Some(Arc::clone(&dispatch)),
207 cancellation_token: None,
208 async_process_id: None,
209 runtime_process_id: None,
210 process_events: None,
211 attachment_store: Arc::clone(&dispatch.attachment_store),
212 direct_completions: dispatch.direct_completions.clone(),
213 prepared_payload: serde_json::Value::Null,
214 tool_call_id: None,
215 completion: ToolCompletionState::default(),
216 durable_effects: ToolDurableEffectState::default(),
217 parent_invocation: dispatch.parent_invocation.clone(),
218 execution_env_spec: dispatch.execution_env_spec.clone(),
219 child_execution_trace_hook: None,
220 }
221 }
222
223 #[cfg(any(test, feature = "testing"))]
224 pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
225 self.tool_call_id = tool_call_id.into();
226 self
227 }
228
229 pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
230 self.tool_call_id = Some(call.call_id.clone());
231 self.prepared_payload = call.prepared_payload.clone();
232 self
233 }
234
235 pub(crate) fn cancellation_token(
236 mut self,
237 cancellation_token: Option<tokio_util::sync::CancellationToken>,
238 ) -> Self {
239 self.cancellation_token = cancellation_token;
240 self
241 }
242
243 pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
244 self.runtime_process_id = process_id;
245 self
246 }
247
248 pub(crate) fn async_process(
249 mut self,
250 process_id: impl Into<String>,
251 cancellation_token: tokio_util::sync::CancellationToken,
252 ) -> Self {
253 self.async_process_id = Some(process_id.into());
254 self.cancellation_token = Some(cancellation_token);
255 self
256 }
257
258 pub(crate) fn process_events(
259 mut self,
260 process_id: impl Into<String>,
261 registry: Arc<dyn crate::ProcessRegistry>,
262 store: Option<Arc<dyn crate::RuntimePersistence>>,
263 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
264 queued_work_poke: Option<crate::QueuedWorkPoke>,
265 ) -> Self {
266 self.process_events = Some(ToolProcessEventContext {
267 process_id: process_id.into(),
268 registry,
269 store,
270 session_store_factory,
271 session_graph: Arc::clone(&self.session_graph),
272 queued_work_poke,
273 });
274 self
275 }
276
277 pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
278 self.parent_invocation = metadata;
279 self
280 }
281
282 pub(crate) fn child_execution_trace_hook(
283 mut self,
284 hook: Option<ToolChildExecutionTraceHook>,
285 ) -> Self {
286 self.child_execution_trace_hook = hook;
287 self
288 }
289
290 pub(crate) fn build(self) -> ToolContext<'run> {
291 ToolContext {
292 session_id: self.session_id,
293 agent_frame_id: self.agent_frame_id,
294 sessions: self.sessions,
295 session_lifecycle: self.session_lifecycle,
296 processes: self.processes,
297 process_cancel_ability: self.process_cancel_ability,
298 effect_controller: self.effect_controller,
299 runtime_dispatch: self.runtime_dispatch,
300 cancellation_token: self.cancellation_token,
301 async_process_id: self.async_process_id,
302 runtime_process_id: self.runtime_process_id,
303 process_events: self.process_events,
304 attachment_store: self.attachment_store,
305 direct_completions: self.direct_completions,
306 prepared_payload: self.prepared_payload,
307 tool_call_id: self.tool_call_id,
308 attempt_number: 1,
309 max_attempts: 1,
310 replay_key: None,
311 completion: self.completion,
312 durable_effects: self.durable_effects,
313 parent_invocation: self.parent_invocation,
314 execution_env_spec: self.execution_env_spec,
315 child_execution_trace_hook: self.child_execution_trace_hook,
316 }
317 }
318}
319
320impl<'run> ToolContext<'run> {
321 #[cfg(any(test, feature = "testing"))]
322 #[expect(
323 clippy::too_many_arguments,
324 reason = "testing constructor mirrors the sealed runtime tool context dependencies"
325 )]
326 pub(crate) fn builder(
327 session_id: String,
328 sessions: Arc<dyn SessionStateService>,
329 session_lifecycle: Arc<dyn SessionLifecycleService>,
330 session_graph: Arc<dyn SessionGraphService>,
331 processes: Arc<dyn crate::ProcessService>,
332 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
333 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
334 attachment_store: Arc<dyn AttachmentStore>,
335 direct_completions: crate::DirectCompletionClient<'run>,
336 ) -> ToolContextBuilder<'run> {
337 ToolContextBuilder {
338 session_id,
339 agent_frame_id: String::new(),
340 sessions,
341 session_lifecycle,
342 session_graph,
343 processes,
344 process_cancel_ability,
345 effect_controller,
346 runtime_dispatch: None,
347 cancellation_token: None,
348 async_process_id: None,
349 runtime_process_id: None,
350 process_events: None,
351 attachment_store,
352 direct_completions,
353 prepared_payload: serde_json::Value::Null,
354 tool_call_id: None,
355 completion: ToolCompletionState::default(),
356 durable_effects: ToolDurableEffectState::default(),
357 parent_invocation: None,
358 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
359 crate::PluginOptions::default(),
360 crate::SessionPolicy::default(),
361 ),
362 child_execution_trace_hook: None,
363 }
364 }
365
366 pub(crate) fn from_dispatch(
367 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
368 ) -> ToolContextBuilder<'run> {
369 ToolContextBuilder::from_dispatch(dispatch)
370 }
371
372 pub fn session_id(&self) -> &str {
373 &self.session_id
374 }
375
376 pub fn agent_frame_id(&self) -> &str {
377 &self.agent_frame_id
378 }
379
380 pub fn sessions(&self) -> ToolSessionAdmin<'run> {
381 ToolSessionAdmin {
382 session_id: self.session_id.clone(),
383 sessions: Arc::clone(&self.sessions),
384 session_lifecycle: Arc::clone(&self.session_lifecycle),
385 effect_controller: self.effect_controller.clone(),
386 }
387 }
388
389 pub fn dispatch(&self) -> ToolDispatchClient<'run> {
390 ToolDispatchClient {
391 context: self.clone(),
392 }
393 }
394
395 pub fn triggers(&self) -> ToolTriggerClient<'run> {
396 ToolTriggerClient {
397 context: self.clone(),
398 }
399 }
400
401 pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
402 ToolSessionProcessAdmin {
403 session_id: self.session_id.clone(),
404 agent_frame_id: self.agent_frame_id.clone(),
405 processes: Arc::clone(&self.processes),
406 process_cancel_ability: Arc::clone(&self.process_cancel_ability),
407 effect_controller: self.effect_controller.clone(),
408 parent_invocation: self.parent_invocation.clone(),
409 tool_call_id: self.tool_call_id.clone(),
410 execution_env_spec: self.execution_env_spec.clone(),
411 }
412 }
413
414 pub fn emit_child_process_started(
415 &self,
416 process_id: impl Into<String>,
417 child_entry_name: Option<String>,
418 ) {
419 let Some(hook) = &self.child_execution_trace_hook else {
420 return;
421 };
422 hook.child_process_started(ToolChildProcessStarted {
423 process_id: process_id.into(),
424 child_entry_name,
425 });
426 }
427
428 pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
429 ToolDirectCompletionClient {
430 session_id: self.session_id.clone(),
431 tool_call_id: self.tool_call_id.clone(),
432 direct_completions: self.direct_completions.clone(),
433 }
434 }
435
436 pub fn attachments(&self) -> ToolAttachmentClient {
437 ToolAttachmentClient {
438 store: Arc::clone(&self.attachment_store),
439 }
440 }
441
442 pub fn process_events(&self) -> ToolProcessEventClient {
443 ToolProcessEventClient {
444 context: self.process_events.clone(),
445 }
446 }
447
448 pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
455 let Some(tool_call_id) = self.tool_call_id.as_deref() else {
456 return Err(crate::RuntimeError::new(
457 "durable_effects_missing_call_id",
458 "durable effects require a prepared tool call id",
459 ));
460 };
461 if tool_call_id.trim().is_empty() {
462 return Err(crate::RuntimeError::new(
463 "durable_effects_missing_call_id",
464 "durable effects require a non-empty prepared tool call id",
465 ));
466 }
467 let scoped = self.effect_controller.scoped();
468 if !scoped.controller().supports_durable_effects() {
469 return Err(crate::RuntimeError::new(
470 "durable_effects_unavailable",
471 "this effect controller does not support durable tool effects",
472 ));
473 }
474 Ok(ToolDurableEffects { context: self })
475 }
476
477 pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
478 self.cancellation_token.as_ref()
479 }
480
481 pub fn async_process_id(&self) -> Option<&str> {
482 self.async_process_id.as_deref()
483 }
484
485 pub fn runtime_process_id(&self) -> Option<&str> {
486 self.async_process_id
487 .as_deref()
488 .or(self.runtime_process_id.as_deref())
489 .or_else(|| {
490 self.process_events
491 .as_ref()
492 .map(|context| context.process_id.as_str())
493 })
494 }
495
496 pub fn tool_call_id(&self) -> Option<&str> {
497 self.tool_call_id.as_deref()
498 }
499
500 pub fn prepared_payload(&self) -> &serde_json::Value {
501 &self.prepared_payload
502 }
503
504 pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
505 where
506 T: serde::de::DeserializeOwned,
507 {
508 serde_json::from_value(self.prepared_payload.clone())
509 }
510
511 pub fn attempt_number(&self) -> u32 {
512 self.attempt_number
513 }
514
515 pub fn max_attempts(&self) -> u32 {
516 self.max_attempts
517 }
518
519 pub fn replay_key(&self) -> Option<&str> {
520 self.replay_key.as_deref()
521 }
522
523 pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
537 let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
538 crate::RuntimeError::new(
539 "tool_completion_key_missing_call_id",
540 "completion keys require a prepared tool call id",
541 )
542 })?;
543 let scoped = self.effect_controller.scoped();
544 let key = scoped
545 .controller()
546 .await_event_key(
547 scoped.execution_scope(),
548 crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
549 )
550 .await?;
551 self.completion.store(key)
552 }
553
554 pub(crate) fn take_completion_key(
555 &self,
556 ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
557 self.completion.take()
558 }
559
560 pub fn with_async_process(
561 mut self,
562 process_id: impl Into<String>,
563 cancellation_token: tokio_util::sync::CancellationToken,
564 ) -> Self {
565 self.async_process_id = Some(process_id.into());
566 self.runtime_process_id = self.async_process_id.clone();
567 self.cancellation_token = Some(cancellation_token);
568 self
569 }
570
571 #[cfg(any(test, feature = "testing"))]
572 #[doc(hidden)]
573 pub fn with_process_events_for_testing(
574 mut self,
575 process_id: impl Into<String>,
576 registry: Arc<dyn crate::ProcessRegistry>,
577 ) -> Self {
578 self.process_events = Some(ToolProcessEventContext {
579 process_id: process_id.into(),
580 registry,
581 store: None,
582 session_store_factory: None,
583 session_graph: Arc::new(crate::plugin::NoopSessionManager),
584 queued_work_poke: None,
585 });
586 self
587 }
588
589 pub(crate) fn with_retry_context(
590 mut self,
591 tool_name: &str,
592 attempt_number: u32,
593 max_attempts: u32,
594 ) -> Self {
595 self.attempt_number = attempt_number.max(1);
596 self.max_attempts = max_attempts.max(1);
597 self.replay_key = self
598 .tool_call_id
599 .as_ref()
600 .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
601 self
602 }
603
604 pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
605 self.prepared_payload = payload;
606 self
607 }
608
609 #[cfg(any(test, feature = "testing"))]
612 #[doc(hidden)]
613 #[expect(
614 clippy::too_many_arguments,
615 reason = "test-only constructor mirrors the sealed runtime tool context"
616 )]
617 pub fn __for_testing(
618 session_id: String,
619 sessions: Arc<dyn SessionStateService>,
620 session_lifecycle: Arc<dyn SessionLifecycleService>,
621 session_graph: Arc<dyn SessionGraphService>,
622 processes: Arc<dyn crate::ProcessService>,
623 attachment_store: Arc<dyn AttachmentStore>,
624 direct_completions: crate::DirectCompletionClient<'static>,
625 tool_call_id: Option<String>,
626 ) -> ToolContext<'static> {
627 ToolContext::builder(
628 session_id,
629 sessions,
630 session_lifecycle,
631 session_graph,
632 processes,
633 Arc::new(crate::DefaultProcessCancelAbility),
634 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
635 crate::InlineRuntimeEffectController,
636 )),
637 attachment_store,
638 direct_completions,
639 )
640 .tool_call_id(tool_call_id)
641 .build()
642 }
643
644 #[cfg(any(test, feature = "testing"))]
648 #[doc(hidden)]
649 #[expect(
650 clippy::too_many_arguments,
651 reason = "test-only constructor mirrors the sealed runtime context"
652 )]
653 pub fn __for_testing_with_process_cancel_ability(
654 session_id: String,
655 sessions: Arc<dyn SessionStateService>,
656 session_lifecycle: Arc<dyn SessionLifecycleService>,
657 session_graph: Arc<dyn SessionGraphService>,
658 processes: Arc<dyn crate::ProcessService>,
659 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
660 attachment_store: Arc<dyn AttachmentStore>,
661 direct_completions: crate::DirectCompletionClient<'static>,
662 tool_call_id: Option<String>,
663 ) -> ToolContext<'static> {
664 ToolContext::builder(
665 session_id,
666 sessions,
667 session_lifecycle,
668 session_graph,
669 processes,
670 process_cancel_ability,
671 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
672 crate::InlineRuntimeEffectController,
673 )),
674 attachment_store,
675 direct_completions,
676 )
677 .tool_call_id(tool_call_id)
678 .build()
679 }
680}
681
682pub struct ToolDurableEffects<'ctx, 'run> {
688 context: &'ctx ToolContext<'run>,
689}
690
691impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
692 pub async fn run_json<F, Fut>(
693 &self,
694 step_id: impl Into<String>,
695 input: serde_json::Value,
696 run: F,
697 ) -> Result<serde_json::Value, crate::RuntimeError>
698 where
699 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
700 Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
701 {
702 let step_id = step_id.into();
703 if step_id.trim().is_empty() {
704 return Err(crate::RuntimeError::new(
705 "durable_effect_empty_step_id",
706 "durable effect step id must be non-empty",
707 ));
708 }
709 self.context.durable_effects.reserve_step(&step_id)?;
710 let invocation = self.step_invocation(
711 format!("durable-step:{step_id}"),
712 crate::RuntimeEffectKind::DurableStep,
713 format!("durable-step:{step_id}"),
714 )?;
715 let outcome = self
716 .context
717 .effect_controller
718 .controller()
719 .execute_effect(
720 crate::RuntimeEffectEnvelope::new(
721 invocation,
722 crate::RuntimeEffectCommand::DurableStep {
723 step_id,
724 input: input.clone(),
725 },
726 ),
727 crate::RuntimeEffectLocalExecutor::durable_step(run),
728 )
729 .await
730 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
731 outcome
732 .into_durable_step()
733 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
734 }
735
736 pub async fn external_event_key(
737 &self,
738 key: impl Into<String>,
739 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
740 let key = key.into();
741 if key.trim().is_empty() {
742 return Err(crate::RuntimeError::new(
743 "durable_effect_empty_event_key",
744 "durable effect external event key must be non-empty",
745 ));
746 }
747 let scoped = self.context.effect_controller.scoped();
748 scoped
749 .controller()
750 .await_event_key(
751 scoped.execution_scope(),
752 crate::AwaitEventWaitIdentity::Custom { key },
753 )
754 .await
755 }
756
757 pub async fn await_event_json(
758 &self,
759 key: crate::AwaitEventKey,
760 ) -> Result<serde_json::Value, crate::RuntimeError> {
761 let invocation = self.step_invocation(
762 format!("await-event:{}", key.key_id),
763 crate::RuntimeEffectKind::AwaitEvent,
764 format!("await-event:{}", key.key_id),
765 )?;
766 let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
767 let outcome = self
768 .context
769 .effect_controller
770 .controller()
771 .execute_effect(
772 crate::RuntimeEffectEnvelope::new(
773 invocation,
774 crate::RuntimeEffectCommand::AwaitEvent { key },
775 ),
776 crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
777 )
778 .await
779 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
780 match outcome
781 .into_await_event()
782 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
783 {
784 crate::Resolution::Ok(value) => Ok(value),
785 crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
786 crate::Resolution::Timeout => Err(crate::RuntimeError::new(
787 "durable_effect_event_timeout",
788 "durable effect external event wait timed out",
789 )),
790 crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
791 "durable_effect_event_cancelled",
792 "durable effect external event wait was cancelled",
793 )),
794 }
795 }
796
797 pub async fn emit_process_event(
798 &self,
799 event_type: impl Into<String>,
800 payload: serde_json::Value,
801 ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
802 let Some(process) = self.context.process_events.as_ref() else {
803 return Err(crate::RuntimeError::new(
804 "durable_effect_process_event_unavailable",
805 "durable effect process events are unavailable outside a durable process",
806 ));
807 };
808 let event_type = event_type.into();
809 if event_type.trim().is_empty() {
810 return Err(crate::RuntimeError::new(
811 "durable_effect_empty_process_event_type",
812 "durable effect process event type must be non-empty",
813 ));
814 }
815 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
816 crate::RuntimeError::new(
817 "durable_effects_missing_call_id",
818 "durable effects require a prepared tool call id",
819 )
820 })?;
821 let sequence = self.context.durable_effects.next_process_event_sequence();
822 let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
823 format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
824 );
825 self.context
826 .process_events()
827 .emit_request(request)
828 .await
829 .map_err(|err| {
830 crate::RuntimeError::new(
831 "durable_effect_process_event_append_failed",
832 err.to_string(),
833 )
834 })
835 .and_then(|event| {
836 if event.process_id == process.process_id {
837 Ok(event)
838 } else {
839 Err(crate::RuntimeError::new(
840 "durable_effect_process_event_process_mismatch",
841 "process event append returned an event for a different process",
842 ))
843 }
844 })
845 }
846
847 fn step_invocation(
848 &self,
849 effect_id_suffix: impl Into<String>,
850 kind: crate::RuntimeEffectKind,
851 replay_suffix: impl AsRef<str>,
852 ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
853 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
854 crate::RuntimeError::new(
855 "durable_effects_missing_call_id",
856 "durable effects require a prepared tool call id",
857 )
858 })?;
859 let effect_id_suffix = effect_id_suffix.into();
860 if let Some(parent) = self.context.parent_invocation.as_ref() {
861 return Ok(crate::runtime::causal::child_effect_invocation(
862 parent,
863 format!("{tool_call_id}:{effect_id_suffix}"),
864 kind,
865 replay_suffix,
866 ));
867 }
868 let scoped = self.context.effect_controller.scoped();
869 let replay_key = format!(
870 "{}:tool:{tool_call_id}:{}",
871 scoped.scope_id(),
872 replay_suffix.as_ref()
873 );
874 Ok(crate::RuntimeInvocation::effect(
875 crate::RuntimeScope::new(self.context.session_id.clone()),
876 format!("{tool_call_id}:{effect_id_suffix}"),
877 kind,
878 replay_key,
879 ))
880 }
881}
882
883#[derive(Clone, Debug, Serialize, Deserialize)]
889pub struct PreparedToolCall {
890 pub call_id: String,
891 pub tool_name: String,
892 pub args: serde_json::Value,
893 #[serde(default, skip_serializing_if = "Option::is_none")]
894 pub replay: Option<ProviderReplayMeta>,
895 #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
896 pub prepared_payload: serde_json::Value,
897}
898
899impl PreparedToolCall {
900 pub fn identity(call: crate::sansio::PendingToolCall) -> Self {
901 Self {
902 call_id: call.call_id,
903 tool_name: call.tool_name,
904 args: call.args,
905 replay: call.replay,
906 prepared_payload: serde_json::Value::Null,
907 }
908 }
909
910 pub fn from_parts(
911 call_id: impl Into<String>,
912 tool_name: impl Into<String>,
913 args: serde_json::Value,
914 replay: Option<ProviderReplayMeta>,
915 prepared_payload: serde_json::Value,
916 ) -> Self {
917 Self {
918 call_id: call_id.into(),
919 tool_name: tool_name.into(),
920 args,
921 replay,
922 prepared_payload,
923 }
924 }
925}
926
927#[derive(Clone)]
928pub struct ToolPrepareContext {
929 session_id: String,
930 sessions: Arc<dyn SessionStateService>,
931 turn_context: crate::TurnContext,
932 tool_call_id: Option<String>,
933}
934
935impl ToolPrepareContext {
936 pub(crate) fn new(
937 session_id: String,
938 sessions: Arc<dyn SessionStateService>,
939 turn_context: crate::TurnContext,
940 tool_call_id: Option<String>,
941 ) -> Self {
942 Self {
943 session_id,
944 sessions,
945 turn_context,
946 tool_call_id,
947 }
948 }
949
950 pub fn session_id(&self) -> &str {
951 &self.session_id
952 }
953
954 pub fn tool_call_id(&self) -> Option<&str> {
955 self.tool_call_id.as_deref()
956 }
957
958 pub fn turn_context(&self) -> &crate::TurnContext {
959 &self.turn_context
960 }
961
962 pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
963 where
964 T: 'static,
965 {
966 self.turn_context.plugin_input::<T>(plugin_id)
967 }
968
969 pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
970 self.sessions.snapshot_session(&self.session_id).await
971 }
972
973 pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
974 self.sessions.tool_catalog(&self.session_id).await
975 }
976
977 pub async fn shared_tool_catalog(
978 &self,
979 ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
980 self.sessions.shared_tool_catalog(&self.session_id).await
981 }
982}
983
984pub struct ToolPrepareCall<'a> {
986 pub pending: crate::sansio::PendingToolCall,
987 pub context: &'a ToolPrepareContext,
988}
989
990pub struct ToolCall<'a> {
997 pub name: &'a str,
998 pub args: &'a serde_json::Value,
999 pub context: &'a ToolContext<'a>,
1000 pub progress: Option<&'a ProgressSender>,
1001}
1002
1003#[async_trait::async_trait]
1011pub trait ToolProvider: Send + Sync + 'static {
1012 fn tool_manifests(&self) -> Vec<ToolManifest>;
1013 fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1014 self.tool_manifests()
1015 .into_iter()
1016 .find(|manifest| manifest.name == name)
1017 }
1018 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1019 async fn prepare_tool_call(
1020 &self,
1021 call: ToolPrepareCall<'_>,
1022 ) -> Result<PreparedToolCall, ToolResult> {
1023 Ok(PreparedToolCall::identity(call.pending))
1024 }
1025 async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use super::*;
1031 use crate::ProcessRegistry;
1032 use crate::RuntimeEffectController;
1033 use std::sync::atomic::{AtomicU64, Ordering};
1034
1035 struct NoDurableEffectController;
1036
1037 #[async_trait::async_trait]
1038 impl crate::RuntimeEffectController for NoDurableEffectController {
1039 async fn execute_effect(
1040 &self,
1041 _envelope: crate::RuntimeEffectEnvelope,
1042 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1043 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1044 Err(crate::RuntimeEffectControllerError::new(
1045 "unexpected_effect",
1046 "test controller should not execute effects",
1047 ))
1048 }
1049 }
1050
1051 fn test_context_with_controller(
1052 tool_call_id: Option<String>,
1053 controller: Arc<dyn crate::RuntimeEffectController>,
1054 ) -> ToolContext<'static> {
1055 ToolContext::builder(
1056 "session-1".to_string(),
1057 Arc::new(crate::testing::MockSessionManager::default()),
1058 Arc::new(crate::testing::MockSessionManager::default()),
1059 Arc::new(crate::testing::MockSessionManager::default()),
1060 Arc::new(crate::UnavailableProcessService),
1061 Arc::new(crate::DefaultProcessCancelAbility),
1062 crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1063 Arc::new(crate::InMemoryAttachmentStore::new()),
1064 crate::DirectCompletionClient::unavailable(
1065 "direct completions are unavailable in this test context",
1066 ),
1067 )
1068 .tool_call_id(tool_call_id)
1069 .build()
1070 }
1071
1072 #[test]
1073 fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1074 let cancellation = tokio_util::sync::CancellationToken::new();
1075 let prepared = PreparedToolCall::from_parts(
1076 "call-1",
1077 "demo_tool",
1078 serde_json::json!({ "input": true }),
1079 None,
1080 serde_json::json!({ "prepared": true }),
1081 );
1082
1083 let context = ToolContext::builder(
1084 "session-1".to_string(),
1085 Arc::new(crate::testing::MockSessionManager::default()),
1086 Arc::new(crate::testing::MockSessionManager::default()),
1087 Arc::new(crate::testing::MockSessionManager::default()),
1088 Arc::new(crate::UnavailableProcessService),
1089 Arc::new(crate::DefaultProcessCancelAbility),
1090 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1091 crate::InlineRuntimeEffectController,
1092 )),
1093 Arc::new(crate::InMemoryAttachmentStore::new()),
1094 crate::DirectCompletionClient::unavailable(
1095 "direct completions are unavailable in this test context",
1096 ),
1097 )
1098 .prepared_call(&prepared)
1099 .cancellation_token(Some(cancellation.clone()))
1100 .async_process("process-1", cancellation.clone())
1101 .build();
1102
1103 assert_eq!(context.session_id(), "session-1");
1104 assert_eq!(context.tool_call_id(), Some("call-1"));
1105 assert_eq!(
1106 context.prepared_payload(),
1107 &serde_json::json!({ "prepared": true })
1108 );
1109 assert_eq!(context.async_process_id(), Some("process-1"));
1110 assert!(context.cancellation_token().is_some());
1111 }
1112
1113 #[test]
1114 fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1115 let missing_call =
1116 test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1117 let err = match missing_call.durable_effects() {
1118 Ok(_) => panic!("missing prepared tool call id should fail"),
1119 Err(err) => err,
1120 };
1121 assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1122
1123 let unsupported = test_context_with_controller(
1124 Some("call-1".to_string()),
1125 Arc::new(NoDurableEffectController),
1126 );
1127 let err = match unsupported.durable_effects() {
1128 Ok(_) => panic!("unsupported controller should fail"),
1129 Err(err) => err,
1130 };
1131 assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1132 }
1133
1134 #[tokio::test]
1135 async fn durable_run_json_executes_and_maps_closure_errors() {
1136 let context = test_context_with_controller(
1137 Some("call-run-json".to_string()),
1138 Arc::new(crate::InlineRuntimeEffectController),
1139 );
1140 let durable = context.durable_effects().expect("durable effects");
1141 let value = durable
1142 .run_json(
1143 "create",
1144 serde_json::json!({ "x": 1 }),
1145 |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1146 )
1147 .await
1148 .expect("durable step");
1149 assert_eq!(value, serde_json::json!({ "seen": 1 }));
1150
1151 let err = durable
1152 .run_json("fail", serde_json::json!({}), |_| async {
1153 Err(crate::RuntimeError::new(
1154 "durable_step_failed",
1155 "step failed",
1156 ))
1157 })
1158 .await
1159 .expect_err("closure error");
1160 assert_eq!(err.code.as_str(), "durable_step_failed");
1161 assert_eq!(err.message, "step failed");
1162 }
1163
1164 #[tokio::test]
1165 async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1166 let context = test_context_with_controller(
1167 Some("call-step-ids".to_string()),
1168 Arc::new(crate::InlineRuntimeEffectController),
1169 );
1170 let durable = context.durable_effects().expect("durable effects");
1171 let runs = Arc::new(AtomicU64::new(0));
1172
1173 let err = durable
1174 .run_json("", serde_json::Value::Null, {
1175 let runs = Arc::clone(&runs);
1176 move |_| async move {
1177 runs.fetch_add(1, Ordering::Relaxed);
1178 Ok(serde_json::Value::Null)
1179 }
1180 })
1181 .await
1182 .expect_err("empty step id");
1183 assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1184 assert_eq!(runs.load(Ordering::Relaxed), 0);
1185
1186 durable
1187 .run_json("same", serde_json::Value::Null, {
1188 let runs = Arc::clone(&runs);
1189 move |_| async move {
1190 runs.fetch_add(1, Ordering::Relaxed);
1191 Ok(serde_json::Value::Null)
1192 }
1193 })
1194 .await
1195 .expect("first step");
1196 let err = durable
1197 .run_json("same", serde_json::Value::Null, {
1198 let runs = Arc::clone(&runs);
1199 move |_| async move {
1200 runs.fetch_add(1, Ordering::Relaxed);
1201 Ok(serde_json::Value::Null)
1202 }
1203 })
1204 .await
1205 .expect_err("duplicate step id");
1206 assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1207 assert_eq!(runs.load(Ordering::Relaxed), 1);
1208 }
1209
1210 #[tokio::test]
1211 async fn durable_external_event_key_is_custom_and_stable() {
1212 let context = test_context_with_controller(
1213 Some("call-event-key".to_string()),
1214 Arc::new(crate::InlineRuntimeEffectController),
1215 );
1216 let durable = context.durable_effects().expect("durable effects");
1217 let first = durable
1218 .external_event_key("tool-event-stable")
1219 .await
1220 .expect("first key");
1221 let second = durable
1222 .external_event_key("tool-event-stable")
1223 .await
1224 .expect("second key");
1225
1226 assert_eq!(first, second);
1227 assert_eq!(
1228 first.wait,
1229 crate::AwaitEventWaitIdentity::Custom {
1230 key: "tool-event-stable".to_string()
1231 }
1232 );
1233 }
1234
1235 #[tokio::test]
1236 async fn durable_await_event_json_maps_terminal_resolutions() {
1237 let controller = Arc::new(crate::InlineRuntimeEffectController);
1238 let context =
1239 test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1240 let durable = context.durable_effects().expect("durable effects");
1241
1242 let ok_key = durable
1243 .external_event_key("tool-event-ok")
1244 .await
1245 .expect("ok key");
1246 controller
1247 .resolve_await_event(
1248 &ok_key,
1249 crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1250 )
1251 .await
1252 .expect("resolve ok");
1253 let value = durable
1254 .await_event_json(ok_key)
1255 .await
1256 .expect("await ok value");
1257 assert_eq!(value, serde_json::json!({ "answer": 42 }));
1258
1259 let err_key = durable
1260 .external_event_key("tool-event-err")
1261 .await
1262 .expect("err key");
1263 controller
1264 .resolve_await_event(
1265 &err_key,
1266 crate::Resolution::Err(crate::ExternalCompletionError::new(
1267 "external_bad",
1268 "external failed",
1269 )),
1270 )
1271 .await
1272 .expect("resolve err");
1273 let err = durable
1274 .await_event_json(err_key)
1275 .await
1276 .expect_err("await err value");
1277 assert_eq!(err.code.as_str(), "external_bad");
1278
1279 let cancelled_key = durable
1280 .external_event_key("tool-event-cancelled")
1281 .await
1282 .expect("cancelled key");
1283 controller
1284 .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1285 .await
1286 .expect("resolve cancelled");
1287 let err = durable
1288 .await_event_json(cancelled_key)
1289 .await
1290 .expect_err("await cancelled value");
1291 assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1292
1293 let timeout_key = durable
1294 .external_event_key("tool-event-timeout")
1295 .await
1296 .expect("timeout key");
1297 controller
1298 .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1299 .await
1300 .expect("resolve timeout");
1301 let err = durable
1302 .await_event_json(timeout_key)
1303 .await
1304 .expect_err("await timeout value");
1305 assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1306 }
1307
1308 #[tokio::test]
1309 async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1310 let context = test_context_with_controller(
1311 Some("call-no-process".to_string()),
1312 Arc::new(crate::InlineRuntimeEffectController),
1313 );
1314 let err = context
1315 .durable_effects()
1316 .expect("durable effects")
1317 .emit_process_event("tool.event", serde_json::json!({}))
1318 .await
1319 .expect_err("outside process");
1320 assert_eq!(
1321 err.code.as_str(),
1322 "durable_effect_process_event_unavailable"
1323 );
1324
1325 let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1326 let process_id = "process:durable-tool-event";
1327 registry
1328 .register_process(
1329 crate::ProcessRegistration::new(
1330 process_id,
1331 crate::ProcessInput::External {
1332 metadata: serde_json::json!({}),
1333 },
1334 crate::ProcessProvenance::host(),
1335 )
1336 .with_extra_event_types([crate::ProcessEventType {
1337 name: "tool.event".to_string(),
1338 payload_schema: crate::LashSchema::any(),
1339 semantics: crate::ProcessEventSemanticsSpec::default(),
1340 }]),
1341 )
1342 .await
1343 .expect("register process");
1344 let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1345 let context = test_context_with_controller(
1346 Some("call-process-event".to_string()),
1347 Arc::new(crate::InlineRuntimeEffectController),
1348 )
1349 .with_process_events_for_testing(process_id, registry_dyn);
1350
1351 let event = context
1352 .durable_effects()
1353 .expect("durable effects")
1354 .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1355 .await
1356 .expect("process event");
1357 assert_eq!(event.process_id, process_id);
1358 assert_eq!(event.event_type, "tool.event");
1359 assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1360 assert_eq!(
1361 event.invocation.replay_key(),
1362 Some("tool:call-process-event:durable-process-event:0")
1363 );
1364
1365 let append_err = context
1366 .durable_effects()
1367 .expect("durable effects")
1368 .emit_process_event("undeclared.event", serde_json::json!({}))
1369 .await
1370 .expect_err("undeclared event type must fail the append");
1371 assert_eq!(
1372 append_err.code.as_str(),
1373 "durable_effect_process_event_append_failed"
1374 );
1375 }
1376}