1use std::collections::HashSet;
2use std::future::Future;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use lash_sansio::llm::types::ProviderReplayMeta;
7use serde::{Deserialize, Serialize};
8
9use crate::plugin::{
10 PluginError, SessionGraphService, SessionLifecycleService, SessionSnapshot, SessionStateService,
11};
12use crate::{AttachmentStore, ToolContract, ToolManifest, ToolResult};
13
14mod attachments;
15mod direct_completion;
16mod dispatch;
17mod process;
18pub(crate) mod process_events;
19mod session;
20mod triggers;
21
22pub use attachments::ToolAttachmentClient;
23pub use direct_completion::ToolDirectCompletionClient;
24pub use dispatch::ToolDispatchClient;
25pub use process::ToolSessionProcessAdmin;
26pub use process_events::ToolProcessEventClient;
27pub use session::{ToolSessionAdmin, ToolSessionModel};
28pub use triggers::ToolTriggerClient;
29
30#[derive(Clone, Debug)]
32pub struct SandboxMessage {
33 pub text: String,
34 pub kind: String,
36}
37
38pub type ProgressSender = tokio::sync::mpsc::UnboundedSender<SandboxMessage>;
40
41#[derive(Clone, Default)]
42pub(crate) struct ToolCompletionState {
43 key: Arc<Mutex<Option<crate::AwaitEventKey>>>,
44}
45
46impl ToolCompletionState {
47 fn store(
48 &self,
49 key: crate::AwaitEventKey,
50 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
51 let mut guard = self.key.lock().map_err(|_| {
52 crate::RuntimeError::new(
53 "tool_completion_state_poisoned",
54 "tool completion key state lock poisoned",
55 )
56 })?;
57 if let Some(existing) = guard.as_ref() {
58 return Ok(existing.clone());
59 }
60 *guard = Some(key.clone());
61 Ok(key)
62 }
63
64 pub(crate) fn take(&self) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
65 self.key.lock().map(|mut guard| guard.take()).map_err(|_| {
66 crate::RuntimeError::new(
67 "tool_completion_state_poisoned",
68 "tool completion key state lock poisoned",
69 )
70 })
71 }
72}
73
74#[derive(Clone, Default)]
75pub(crate) struct ToolDurableEffectState {
76 step_ids: Arc<Mutex<HashSet<String>>>,
77 process_event_sequence: Arc<AtomicU64>,
78}
79
80impl ToolDurableEffectState {
81 fn reserve_step(&self, step_id: &str) -> Result<(), crate::RuntimeError> {
82 let mut guard = self.step_ids.lock().map_err(|_| {
83 crate::RuntimeError::new(
84 "durable_effect_state_poisoned",
85 "durable effect step state lock poisoned",
86 )
87 })?;
88 if !guard.insert(step_id.to_string()) {
89 return Err(crate::RuntimeError::new(
90 "durable_effect_duplicate_step_id",
91 format!("durable effect step id `{step_id}` was already used by this tool call"),
92 ));
93 }
94 Ok(())
95 }
96
97 fn next_process_event_sequence(&self) -> u64 {
98 self.process_event_sequence.fetch_add(1, Ordering::Relaxed)
99 }
100}
101
102#[derive(Clone)]
105pub struct ToolContext<'run> {
106 pub(crate) session_id: String,
107 pub(crate) agent_frame_id: crate::AgentFrameId,
108 pub(crate) sessions: Arc<dyn SessionStateService>,
109 pub(crate) session_lifecycle: Arc<dyn SessionLifecycleService>,
110 pub(crate) processes: Arc<dyn crate::ProcessService>,
111 pub(crate) process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
112 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
113 pub(crate) runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
114 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
115 pub(crate) async_process_id: Option<String>,
116 pub(crate) runtime_process_id: Option<String>,
117 pub(crate) process_events: Option<ToolProcessEventContext>,
118 pub(crate) attachment_store: Arc<dyn AttachmentStore>,
119 pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
120 pub(crate) prepared_payload: serde_json::Value,
121 pub(crate) tool_call_id: Option<String>,
123 pub(crate) attempt_number: u32,
124 pub(crate) max_attempts: u32,
125 pub(crate) replay_key: Option<String>,
126 pub(crate) completion: ToolCompletionState,
127 pub(crate) durable_effects: ToolDurableEffectState,
128 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
129 pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
130 pub(crate) lashlang_execution_call_site: Option<ToolLashlangExecutionCallSite>,
131}
132
133#[derive(Clone)]
134pub struct ToolLashlangExecutionCallSite {
135 sink: Arc<dyn lash_trace::TraceSink>,
136 base_context: lash_trace::TraceContext,
137 identity: lash_trace::TraceLashlangExecutionIdentity,
138 parent_node_id: String,
139 occurrence: u64,
140}
141
142impl ToolLashlangExecutionCallSite {
143 pub fn new(
144 sink: Arc<dyn lash_trace::TraceSink>,
145 base_context: lash_trace::TraceContext,
146 identity: lash_trace::TraceLashlangExecutionIdentity,
147 parent_node_id: impl Into<String>,
148 occurrence: u64,
149 ) -> Self {
150 Self {
151 sink,
152 base_context,
153 identity,
154 parent_node_id: parent_node_id.into(),
155 occurrence,
156 }
157 }
158}
159
160#[derive(Clone)]
161pub(crate) struct ToolProcessEventContext {
162 process_id: String,
163 registry: Arc<dyn crate::ProcessRegistry>,
164 store: Option<Arc<dyn crate::RuntimePersistence>>,
165 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
166 session_graph: Arc<dyn SessionGraphService>,
167 queued_work_poke: Option<crate::QueuedWorkPoke>,
168}
169
170pub(crate) struct ToolContextBuilder<'run> {
171 session_id: String,
172 agent_frame_id: crate::AgentFrameId,
173 sessions: Arc<dyn SessionStateService>,
174 session_lifecycle: Arc<dyn SessionLifecycleService>,
175 session_graph: Arc<dyn SessionGraphService>,
176 processes: Arc<dyn crate::ProcessService>,
177 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
178 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
179 runtime_dispatch: Option<Arc<crate::tool_dispatch::ToolDispatchContext<'run>>>,
180 cancellation_token: Option<tokio_util::sync::CancellationToken>,
181 async_process_id: Option<String>,
182 runtime_process_id: Option<String>,
183 process_events: Option<ToolProcessEventContext>,
184 attachment_store: Arc<dyn AttachmentStore>,
185 direct_completions: crate::DirectCompletionClient<'run>,
186 prepared_payload: serde_json::Value,
187 tool_call_id: Option<String>,
188 completion: ToolCompletionState,
189 durable_effects: ToolDurableEffectState,
190 parent_invocation: Option<crate::RuntimeInvocation>,
191 execution_env_spec: crate::ProcessExecutionEnvSpec,
192 lashlang_execution_call_site: Option<ToolLashlangExecutionCallSite>,
193}
194
195impl<'run> ToolContextBuilder<'run> {
196 pub(crate) fn from_dispatch(
197 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
198 ) -> Self {
199 Self {
200 session_id: dispatch.session_id.clone(),
201 agent_frame_id: dispatch.agent_frame_id.clone(),
202 sessions: Arc::clone(&dispatch.sessions),
203 session_lifecycle: Arc::clone(&dispatch.session_lifecycle),
204 session_graph: Arc::clone(&dispatch.session_graph),
205 processes: Arc::clone(&dispatch.processes),
206 process_cancel_ability: Arc::clone(&dispatch.process_cancel_ability),
207 effect_controller: dispatch.effect_controller.clone(),
208 runtime_dispatch: Some(Arc::clone(&dispatch)),
209 cancellation_token: None,
210 async_process_id: None,
211 runtime_process_id: None,
212 process_events: None,
213 attachment_store: Arc::clone(&dispatch.attachment_store),
214 direct_completions: dispatch.direct_completions.clone(),
215 prepared_payload: serde_json::Value::Null,
216 tool_call_id: None,
217 completion: ToolCompletionState::default(),
218 durable_effects: ToolDurableEffectState::default(),
219 parent_invocation: dispatch.parent_invocation.clone(),
220 execution_env_spec: dispatch.execution_env_spec.clone(),
221 lashlang_execution_call_site: None,
222 }
223 }
224
225 #[cfg(any(test, feature = "testing"))]
226 pub(crate) fn tool_call_id(mut self, tool_call_id: impl Into<Option<String>>) -> Self {
227 self.tool_call_id = tool_call_id.into();
228 self
229 }
230
231 pub(crate) fn prepared_call(mut self, call: &PreparedToolCall) -> Self {
232 self.tool_call_id = Some(call.call_id.clone());
233 self.prepared_payload = call.prepared_payload.clone();
234 self
235 }
236
237 pub(crate) fn cancellation_token(
238 mut self,
239 cancellation_token: Option<tokio_util::sync::CancellationToken>,
240 ) -> Self {
241 self.cancellation_token = cancellation_token;
242 self
243 }
244
245 pub(crate) fn runtime_process_id(mut self, process_id: Option<String>) -> Self {
246 self.runtime_process_id = process_id;
247 self
248 }
249
250 pub(crate) fn async_process(
251 mut self,
252 process_id: impl Into<String>,
253 cancellation_token: tokio_util::sync::CancellationToken,
254 ) -> Self {
255 self.async_process_id = Some(process_id.into());
256 self.cancellation_token = Some(cancellation_token);
257 self
258 }
259
260 pub(crate) fn process_events(
261 mut self,
262 process_id: impl Into<String>,
263 registry: Arc<dyn crate::ProcessRegistry>,
264 store: Option<Arc<dyn crate::RuntimePersistence>>,
265 session_store_factory: Option<Arc<dyn crate::SessionStoreFactory>>,
266 queued_work_poke: Option<crate::QueuedWorkPoke>,
267 ) -> Self {
268 self.process_events = Some(ToolProcessEventContext {
269 process_id: process_id.into(),
270 registry,
271 store,
272 session_store_factory,
273 session_graph: Arc::clone(&self.session_graph),
274 queued_work_poke,
275 });
276 self
277 }
278
279 pub(crate) fn parent_invocation(mut self, metadata: Option<crate::RuntimeInvocation>) -> Self {
280 self.parent_invocation = metadata;
281 self
282 }
283
284 pub(crate) fn lashlang_execution_call_site(
285 mut self,
286 call_site: Option<ToolLashlangExecutionCallSite>,
287 ) -> Self {
288 self.lashlang_execution_call_site = call_site;
289 self
290 }
291
292 pub(crate) fn build(self) -> ToolContext<'run> {
293 ToolContext {
294 session_id: self.session_id,
295 agent_frame_id: self.agent_frame_id,
296 sessions: self.sessions,
297 session_lifecycle: self.session_lifecycle,
298 processes: self.processes,
299 process_cancel_ability: self.process_cancel_ability,
300 effect_controller: self.effect_controller,
301 runtime_dispatch: self.runtime_dispatch,
302 cancellation_token: self.cancellation_token,
303 async_process_id: self.async_process_id,
304 runtime_process_id: self.runtime_process_id,
305 process_events: self.process_events,
306 attachment_store: self.attachment_store,
307 direct_completions: self.direct_completions,
308 prepared_payload: self.prepared_payload,
309 tool_call_id: self.tool_call_id,
310 attempt_number: 1,
311 max_attempts: 1,
312 replay_key: None,
313 completion: self.completion,
314 durable_effects: self.durable_effects,
315 parent_invocation: self.parent_invocation,
316 execution_env_spec: self.execution_env_spec,
317 lashlang_execution_call_site: self.lashlang_execution_call_site,
318 }
319 }
320}
321
322impl<'run> ToolContext<'run> {
323 #[cfg(any(test, feature = "testing"))]
324 #[expect(
325 clippy::too_many_arguments,
326 reason = "testing constructor mirrors the sealed runtime tool context dependencies"
327 )]
328 pub(crate) fn builder(
329 session_id: String,
330 sessions: Arc<dyn SessionStateService>,
331 session_lifecycle: Arc<dyn SessionLifecycleService>,
332 session_graph: Arc<dyn SessionGraphService>,
333 processes: Arc<dyn crate::ProcessService>,
334 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
335 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
336 attachment_store: Arc<dyn AttachmentStore>,
337 direct_completions: crate::DirectCompletionClient<'run>,
338 ) -> ToolContextBuilder<'run> {
339 ToolContextBuilder {
340 session_id,
341 agent_frame_id: String::new(),
342 sessions,
343 session_lifecycle,
344 session_graph,
345 processes,
346 process_cancel_ability,
347 effect_controller,
348 runtime_dispatch: None,
349 cancellation_token: None,
350 async_process_id: None,
351 runtime_process_id: None,
352 process_events: None,
353 attachment_store,
354 direct_completions,
355 prepared_payload: serde_json::Value::Null,
356 tool_call_id: None,
357 completion: ToolCompletionState::default(),
358 durable_effects: ToolDurableEffectState::default(),
359 parent_invocation: None,
360 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
361 crate::PluginOptions::default(),
362 crate::SessionPolicy::default(),
363 ),
364 lashlang_execution_call_site: None,
365 }
366 }
367
368 pub(crate) fn from_dispatch(
369 dispatch: Arc<crate::tool_dispatch::ToolDispatchContext<'run>>,
370 ) -> ToolContextBuilder<'run> {
371 ToolContextBuilder::from_dispatch(dispatch)
372 }
373
374 pub fn session_id(&self) -> &str {
375 &self.session_id
376 }
377
378 pub fn agent_frame_id(&self) -> &str {
379 &self.agent_frame_id
380 }
381
382 pub fn sessions(&self) -> ToolSessionAdmin<'run> {
383 ToolSessionAdmin {
384 session_id: self.session_id.clone(),
385 sessions: Arc::clone(&self.sessions),
386 session_lifecycle: Arc::clone(&self.session_lifecycle),
387 effect_controller: self.effect_controller.clone(),
388 }
389 }
390
391 pub fn dispatch(&self) -> ToolDispatchClient<'run> {
392 ToolDispatchClient {
393 context: self.clone(),
394 }
395 }
396
397 pub fn triggers(&self) -> ToolTriggerClient<'run> {
398 ToolTriggerClient {
399 context: self.clone(),
400 }
401 }
402
403 pub fn processes(&self) -> ToolSessionProcessAdmin<'run> {
404 ToolSessionProcessAdmin {
405 session_id: self.session_id.clone(),
406 agent_frame_id: self.agent_frame_id.clone(),
407 processes: Arc::clone(&self.processes),
408 process_cancel_ability: Arc::clone(&self.process_cancel_ability),
409 effect_controller: self.effect_controller.clone(),
410 parent_invocation: self.parent_invocation.clone(),
411 tool_call_id: self.tool_call_id.clone(),
412 execution_env_spec: self.execution_env_spec.clone(),
413 }
414 }
415
416 pub fn emit_lashlang_child_process_started(
417 &self,
418 process_id: impl Into<String>,
419 child_entry_name: Option<String>,
420 ) {
421 let Some(call_site) = &self.lashlang_execution_call_site else {
422 return;
423 };
424 let child = lash_trace::TraceLashlangChildExecution {
425 scope: call_site.identity.scope.clone(),
426 subject: lash_trace::TraceRuntimeSubject::Process {
427 process_id: process_id.into(),
428 },
429 module_ref: None,
430 entry_ref: None,
431 entry_name: child_entry_name,
432 };
433 let child_graph_key = child.graph_key();
434 let event = lash_trace::TraceLashlangExecutionEvent::ChildStarted {
435 event_key: format!(
436 "lashlang_execution:{}:child:{}:{}:{}",
437 call_site.identity.graph_key(),
438 call_site.parent_node_id,
439 call_site.occurrence,
440 child_graph_key
441 ),
442 identity: call_site.identity.clone(),
443 parent_node_id: call_site.parent_node_id.clone(),
444 occurrence: call_site.occurrence,
445 child,
446 };
447 let mut context = lash_trace::TraceContext::default()
448 .for_session(call_site.identity.scope.session_id.clone());
449 if let Some(turn_id) = &call_site.identity.scope.turn_id {
450 context = context.for_turn(turn_id.clone());
451 }
452 if let Some(turn_index) = call_site.identity.scope.turn_index {
453 context = context.for_turn_index(turn_index);
454 }
455 if let Some(protocol_iteration) = call_site.identity.scope.protocol_iteration {
456 context = context.for_protocol_iteration(protocol_iteration);
457 }
458 if let lash_trace::TraceRuntimeSubject::Effect { effect_id, .. } =
459 &call_site.identity.subject
460 {
461 context.effect_id = Some(effect_id.clone());
462 }
463 crate::trace::emit_trace(
464 &Some(Arc::clone(&call_site.sink)),
465 &call_site.base_context,
466 context,
467 lash_trace::TraceEvent::LashlangExecution { event },
468 );
469 }
470
471 pub fn direct_completions(&self) -> ToolDirectCompletionClient<'run> {
472 ToolDirectCompletionClient {
473 session_id: self.session_id.clone(),
474 tool_call_id: self.tool_call_id.clone(),
475 direct_completions: self.direct_completions.clone(),
476 }
477 }
478
479 pub fn attachments(&self) -> ToolAttachmentClient {
480 ToolAttachmentClient {
481 store: Arc::clone(&self.attachment_store),
482 }
483 }
484
485 pub fn process_events(&self) -> ToolProcessEventClient {
486 ToolProcessEventClient {
487 context: self.process_events.clone(),
488 }
489 }
490
491 pub fn durable_effects(&self) -> Result<ToolDurableEffects<'_, 'run>, crate::RuntimeError> {
498 let Some(tool_call_id) = self.tool_call_id.as_deref() else {
499 return Err(crate::RuntimeError::new(
500 "durable_effects_missing_call_id",
501 "durable effects require a prepared tool call id",
502 ));
503 };
504 if tool_call_id.trim().is_empty() {
505 return Err(crate::RuntimeError::new(
506 "durable_effects_missing_call_id",
507 "durable effects require a non-empty prepared tool call id",
508 ));
509 }
510 let scoped = self.effect_controller.scoped();
511 if !scoped.controller().supports_durable_effects() {
512 return Err(crate::RuntimeError::new(
513 "durable_effects_unavailable",
514 "this effect controller does not support durable tool effects",
515 ));
516 }
517 Ok(ToolDurableEffects { context: self })
518 }
519
520 pub fn cancellation_token(&self) -> Option<&tokio_util::sync::CancellationToken> {
521 self.cancellation_token.as_ref()
522 }
523
524 pub fn async_process_id(&self) -> Option<&str> {
525 self.async_process_id.as_deref()
526 }
527
528 pub fn runtime_process_id(&self) -> Option<&str> {
529 self.async_process_id
530 .as_deref()
531 .or(self.runtime_process_id.as_deref())
532 .or_else(|| {
533 self.process_events
534 .as_ref()
535 .map(|context| context.process_id.as_str())
536 })
537 }
538
539 pub fn tool_call_id(&self) -> Option<&str> {
540 self.tool_call_id.as_deref()
541 }
542
543 pub fn prepared_payload(&self) -> &serde_json::Value {
544 &self.prepared_payload
545 }
546
547 pub fn decode_prepared_payload<T>(&self) -> Result<T, serde_json::Error>
548 where
549 T: serde::de::DeserializeOwned,
550 {
551 serde_json::from_value(self.prepared_payload.clone())
552 }
553
554 pub fn attempt_number(&self) -> u32 {
555 self.attempt_number
556 }
557
558 pub fn max_attempts(&self) -> u32 {
559 self.max_attempts
560 }
561
562 pub fn replay_key(&self) -> Option<&str> {
563 self.replay_key.as_deref()
564 }
565
566 pub async fn completion_key(&self) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
580 let tool_call_id = self.tool_call_id.clone().ok_or_else(|| {
581 crate::RuntimeError::new(
582 "tool_completion_key_missing_call_id",
583 "completion keys require a prepared tool call id",
584 )
585 })?;
586 let scoped = self.effect_controller.scoped();
587 let key = scoped
588 .controller()
589 .await_event_key(
590 scoped.execution_scope(),
591 crate::AwaitEventWaitIdentity::tool_completion(tool_call_id),
592 )
593 .await?;
594 self.completion.store(key)
595 }
596
597 pub(crate) fn take_completion_key(
598 &self,
599 ) -> Result<Option<crate::AwaitEventKey>, crate::RuntimeError> {
600 self.completion.take()
601 }
602
603 pub fn with_async_process(
604 mut self,
605 process_id: impl Into<String>,
606 cancellation_token: tokio_util::sync::CancellationToken,
607 ) -> Self {
608 self.async_process_id = Some(process_id.into());
609 self.runtime_process_id = self.async_process_id.clone();
610 self.cancellation_token = Some(cancellation_token);
611 self
612 }
613
614 #[cfg(any(test, feature = "testing"))]
615 #[doc(hidden)]
616 pub fn with_process_events_for_testing(
617 mut self,
618 process_id: impl Into<String>,
619 registry: Arc<dyn crate::ProcessRegistry>,
620 ) -> Self {
621 self.process_events = Some(ToolProcessEventContext {
622 process_id: process_id.into(),
623 registry,
624 store: None,
625 session_store_factory: None,
626 session_graph: Arc::new(crate::plugin::NoopSessionManager),
627 queued_work_poke: None,
628 });
629 self
630 }
631
632 pub(crate) fn with_retry_context(
633 mut self,
634 tool_name: &str,
635 attempt_number: u32,
636 max_attempts: u32,
637 ) -> Self {
638 self.attempt_number = attempt_number.max(1);
639 self.max_attempts = max_attempts.max(1);
640 self.replay_key = self
641 .tool_call_id
642 .as_ref()
643 .map(|call_id| format!("lash-tool:{}:{call_id}:{tool_name}", self.session_id));
644 self
645 }
646
647 pub(crate) fn with_prepared_payload(mut self, payload: serde_json::Value) -> Self {
648 self.prepared_payload = payload;
649 self
650 }
651
652 #[cfg(any(test, feature = "testing"))]
655 #[doc(hidden)]
656 #[expect(
657 clippy::too_many_arguments,
658 reason = "test-only constructor mirrors the sealed runtime tool context"
659 )]
660 pub fn __for_testing(
661 session_id: String,
662 sessions: Arc<dyn SessionStateService>,
663 session_lifecycle: Arc<dyn SessionLifecycleService>,
664 session_graph: Arc<dyn SessionGraphService>,
665 processes: Arc<dyn crate::ProcessService>,
666 attachment_store: Arc<dyn AttachmentStore>,
667 direct_completions: crate::DirectCompletionClient<'static>,
668 tool_call_id: Option<String>,
669 ) -> ToolContext<'static> {
670 ToolContext::builder(
671 session_id,
672 sessions,
673 session_lifecycle,
674 session_graph,
675 processes,
676 Arc::new(crate::DefaultProcessCancelAbility),
677 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
678 crate::InlineRuntimeEffectController,
679 )),
680 attachment_store,
681 direct_completions,
682 )
683 .tool_call_id(tool_call_id)
684 .build()
685 }
686
687 #[cfg(any(test, feature = "testing"))]
691 #[doc(hidden)]
692 #[expect(
693 clippy::too_many_arguments,
694 reason = "test-only constructor mirrors the sealed runtime context"
695 )]
696 pub fn __for_testing_with_process_cancel_ability(
697 session_id: String,
698 sessions: Arc<dyn SessionStateService>,
699 session_lifecycle: Arc<dyn SessionLifecycleService>,
700 session_graph: Arc<dyn SessionGraphService>,
701 processes: Arc<dyn crate::ProcessService>,
702 process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
703 attachment_store: Arc<dyn AttachmentStore>,
704 direct_completions: crate::DirectCompletionClient<'static>,
705 tool_call_id: Option<String>,
706 ) -> ToolContext<'static> {
707 ToolContext::builder(
708 session_id,
709 sessions,
710 session_lifecycle,
711 session_graph,
712 processes,
713 process_cancel_ability,
714 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
715 crate::InlineRuntimeEffectController,
716 )),
717 attachment_store,
718 direct_completions,
719 )
720 .tool_call_id(tool_call_id)
721 .build()
722 }
723}
724
725pub struct ToolDurableEffects<'ctx, 'run> {
731 context: &'ctx ToolContext<'run>,
732}
733
734impl<'ctx, 'run> ToolDurableEffects<'ctx, 'run> {
735 pub async fn run_json<F, Fut>(
736 &self,
737 step_id: impl Into<String>,
738 input: serde_json::Value,
739 run: F,
740 ) -> Result<serde_json::Value, crate::RuntimeError>
741 where
742 F: FnOnce(serde_json::Value) -> Fut + Send + 'run,
743 Fut: Future<Output = Result<serde_json::Value, crate::RuntimeError>> + Send + 'run,
744 {
745 let step_id = step_id.into();
746 if step_id.trim().is_empty() {
747 return Err(crate::RuntimeError::new(
748 "durable_effect_empty_step_id",
749 "durable effect step id must be non-empty",
750 ));
751 }
752 self.context.durable_effects.reserve_step(&step_id)?;
753 let invocation = self.step_invocation(
754 format!("durable-step:{step_id}"),
755 crate::RuntimeEffectKind::DurableStep,
756 format!("durable-step:{step_id}"),
757 )?;
758 let outcome = self
759 .context
760 .effect_controller
761 .controller()
762 .execute_effect(
763 crate::RuntimeEffectEnvelope::new(
764 invocation,
765 crate::RuntimeEffectCommand::DurableStep {
766 step_id,
767 input: input.clone(),
768 },
769 ),
770 crate::RuntimeEffectLocalExecutor::durable_step(run),
771 )
772 .await
773 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
774 outcome
775 .into_durable_step()
776 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)
777 }
778
779 pub async fn external_event_key(
780 &self,
781 key: impl Into<String>,
782 ) -> Result<crate::AwaitEventKey, crate::RuntimeError> {
783 let key = key.into();
784 if key.trim().is_empty() {
785 return Err(crate::RuntimeError::new(
786 "durable_effect_empty_event_key",
787 "durable effect external event key must be non-empty",
788 ));
789 }
790 let scoped = self.context.effect_controller.scoped();
791 scoped
792 .controller()
793 .await_event_key(
794 scoped.execution_scope(),
795 crate::AwaitEventWaitIdentity::Custom { key },
796 )
797 .await
798 }
799
800 pub async fn await_event_json(
801 &self,
802 key: crate::AwaitEventKey,
803 ) -> Result<serde_json::Value, crate::RuntimeError> {
804 let invocation = self.step_invocation(
805 format!("await-event:{}", key.key_id),
806 crate::RuntimeEffectKind::AwaitEvent,
807 format!("await-event:{}", key.key_id),
808 )?;
809 let cancellation = self.context.cancellation_token.clone().unwrap_or_default();
810 let outcome = self
811 .context
812 .effect_controller
813 .controller()
814 .execute_effect(
815 crate::RuntimeEffectEnvelope::new(
816 invocation,
817 crate::RuntimeEffectCommand::AwaitEvent { key },
818 ),
819 crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
820 )
821 .await
822 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?;
823 match outcome
824 .into_await_event()
825 .map_err(crate::RuntimeEffectControllerError::into_runtime_error)?
826 {
827 crate::Resolution::Ok(value) => Ok(value),
828 crate::Resolution::Err(err) => Err(crate::RuntimeError::new(err.code, err.message)),
829 crate::Resolution::Timeout => Err(crate::RuntimeError::new(
830 "durable_effect_event_timeout",
831 "durable effect external event wait timed out",
832 )),
833 crate::Resolution::Cancelled => Err(crate::RuntimeError::new(
834 "durable_effect_event_cancelled",
835 "durable effect external event wait was cancelled",
836 )),
837 }
838 }
839
840 pub async fn emit_process_event(
841 &self,
842 event_type: impl Into<String>,
843 payload: serde_json::Value,
844 ) -> Result<crate::ProcessEvent, crate::RuntimeError> {
845 let Some(process) = self.context.process_events.as_ref() else {
846 return Err(crate::RuntimeError::new(
847 "durable_effect_process_event_unavailable",
848 "durable effect process events are unavailable outside a durable process",
849 ));
850 };
851 let event_type = event_type.into();
852 if event_type.trim().is_empty() {
853 return Err(crate::RuntimeError::new(
854 "durable_effect_empty_process_event_type",
855 "durable effect process event type must be non-empty",
856 ));
857 }
858 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
859 crate::RuntimeError::new(
860 "durable_effects_missing_call_id",
861 "durable effects require a prepared tool call id",
862 )
863 })?;
864 let sequence = self.context.durable_effects.next_process_event_sequence();
865 let request = crate::ProcessEventAppendRequest::new(event_type, payload).with_replay_key(
866 format!("tool:{tool_call_id}:durable-process-event:{sequence}"),
867 );
868 self.context
869 .process_events()
870 .emit_request(request)
871 .await
872 .map_err(|err| {
873 crate::RuntimeError::new(
874 "durable_effect_process_event_append_failed",
875 err.to_string(),
876 )
877 })
878 .and_then(|event| {
879 if event.process_id == process.process_id {
880 Ok(event)
881 } else {
882 Err(crate::RuntimeError::new(
883 "durable_effect_process_event_process_mismatch",
884 "process event append returned an event for a different process",
885 ))
886 }
887 })
888 }
889
890 fn step_invocation(
891 &self,
892 effect_id_suffix: impl Into<String>,
893 kind: crate::RuntimeEffectKind,
894 replay_suffix: impl AsRef<str>,
895 ) -> Result<crate::RuntimeInvocation, crate::RuntimeError> {
896 let tool_call_id = self.context.tool_call_id.as_deref().ok_or_else(|| {
897 crate::RuntimeError::new(
898 "durable_effects_missing_call_id",
899 "durable effects require a prepared tool call id",
900 )
901 })?;
902 let effect_id_suffix = effect_id_suffix.into();
903 if let Some(parent) = self.context.parent_invocation.as_ref() {
904 return Ok(crate::runtime::causal::child_effect_invocation(
905 parent,
906 format!("{tool_call_id}:{effect_id_suffix}"),
907 kind,
908 replay_suffix,
909 ));
910 }
911 let scoped = self.context.effect_controller.scoped();
912 let replay_key = format!(
913 "{}:tool:{tool_call_id}:{}",
914 scoped.scope_id(),
915 replay_suffix.as_ref()
916 );
917 Ok(crate::RuntimeInvocation::effect(
918 crate::RuntimeScope::new(self.context.session_id.clone()),
919 format!("{tool_call_id}:{effect_id_suffix}"),
920 kind,
921 replay_key,
922 ))
923 }
924}
925
926#[derive(Clone, Debug, Serialize, Deserialize)]
932pub struct PreparedToolCall {
933 pub call_id: String,
934 pub tool_name: String,
935 pub args: serde_json::Value,
936 #[serde(default, skip_serializing_if = "Option::is_none")]
937 pub replay: Option<ProviderReplayMeta>,
938 #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
939 pub prepared_payload: serde_json::Value,
940}
941
942impl PreparedToolCall {
943 pub fn identity(call: crate::sansio::PendingToolCall) -> Self {
944 Self {
945 call_id: call.call_id,
946 tool_name: call.tool_name,
947 args: call.args,
948 replay: call.replay,
949 prepared_payload: serde_json::Value::Null,
950 }
951 }
952
953 pub fn from_parts(
954 call_id: impl Into<String>,
955 tool_name: impl Into<String>,
956 args: serde_json::Value,
957 replay: Option<ProviderReplayMeta>,
958 prepared_payload: serde_json::Value,
959 ) -> Self {
960 Self {
961 call_id: call_id.into(),
962 tool_name: tool_name.into(),
963 args,
964 replay,
965 prepared_payload,
966 }
967 }
968}
969
970#[derive(Clone)]
971pub struct ToolPrepareContext {
972 session_id: String,
973 sessions: Arc<dyn SessionStateService>,
974 turn_context: crate::TurnContext,
975 tool_call_id: Option<String>,
976}
977
978impl ToolPrepareContext {
979 pub(crate) fn new(
980 session_id: String,
981 sessions: Arc<dyn SessionStateService>,
982 turn_context: crate::TurnContext,
983 tool_call_id: Option<String>,
984 ) -> Self {
985 Self {
986 session_id,
987 sessions,
988 turn_context,
989 tool_call_id,
990 }
991 }
992
993 pub fn session_id(&self) -> &str {
994 &self.session_id
995 }
996
997 pub fn tool_call_id(&self) -> Option<&str> {
998 self.tool_call_id.as_deref()
999 }
1000
1001 pub fn turn_context(&self) -> &crate::TurnContext {
1002 &self.turn_context
1003 }
1004
1005 pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
1006 where
1007 T: 'static,
1008 {
1009 self.turn_context.plugin_input::<T>(plugin_id)
1010 }
1011
1012 pub async fn session_snapshot(&self) -> Result<SessionSnapshot, PluginError> {
1013 self.sessions.snapshot_session(&self.session_id).await
1014 }
1015
1016 pub async fn tool_catalog(&self) -> Result<Vec<serde_json::Value>, PluginError> {
1017 self.sessions.tool_catalog(&self.session_id).await
1018 }
1019
1020 pub async fn shared_tool_catalog(
1021 &self,
1022 ) -> Result<std::sync::Arc<Vec<serde_json::Value>>, PluginError> {
1023 self.sessions.shared_tool_catalog(&self.session_id).await
1024 }
1025}
1026
1027pub struct ToolPrepareCall<'a> {
1029 pub pending: crate::sansio::PendingToolCall,
1030 pub context: &'a ToolPrepareContext,
1031}
1032
1033pub struct ToolCall<'a> {
1040 pub name: &'a str,
1041 pub args: &'a serde_json::Value,
1042 pub context: &'a ToolContext<'a>,
1043 pub progress: Option<&'a ProgressSender>,
1044}
1045
1046#[async_trait::async_trait]
1054pub trait ToolProvider: Send + Sync + 'static {
1055 fn tool_manifests(&self) -> Vec<ToolManifest>;
1056 fn resolve_manifest(&self, name: &str) -> Option<ToolManifest> {
1057 self.tool_manifests()
1058 .into_iter()
1059 .find(|manifest| manifest.name == name)
1060 }
1061 fn resolve_contract(&self, name: &str) -> Option<Arc<ToolContract>>;
1062 async fn prepare_tool_call(
1063 &self,
1064 call: ToolPrepareCall<'_>,
1065 ) -> Result<PreparedToolCall, ToolResult> {
1066 Ok(PreparedToolCall::identity(call.pending))
1067 }
1068 async fn execute(&self, call: ToolCall<'_>) -> ToolResult;
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use super::*;
1074 use crate::ProcessRegistry;
1075 use crate::RuntimeEffectController;
1076 use std::sync::atomic::{AtomicU64, Ordering};
1077
1078 struct NoDurableEffectController;
1079
1080 #[async_trait::async_trait]
1081 impl crate::RuntimeEffectController for NoDurableEffectController {
1082 async fn execute_effect(
1083 &self,
1084 _envelope: crate::RuntimeEffectEnvelope,
1085 _local_executor: crate::RuntimeEffectLocalExecutor<'_>,
1086 ) -> Result<crate::RuntimeEffectOutcome, crate::RuntimeEffectControllerError> {
1087 Err(crate::RuntimeEffectControllerError::new(
1088 "unexpected_effect",
1089 "test controller should not execute effects",
1090 ))
1091 }
1092 }
1093
1094 fn test_context_with_controller(
1095 tool_call_id: Option<String>,
1096 controller: Arc<dyn crate::RuntimeEffectController>,
1097 ) -> ToolContext<'static> {
1098 ToolContext::builder(
1099 "session-1".to_string(),
1100 Arc::new(crate::testing::MockSessionManager::default()),
1101 Arc::new(crate::testing::MockSessionManager::default()),
1102 Arc::new(crate::testing::MockSessionManager::default()),
1103 Arc::new(crate::UnavailableProcessService),
1104 Arc::new(crate::DefaultProcessCancelAbility),
1105 crate::runtime::RuntimeEffectControllerHandle::shared(controller),
1106 Arc::new(crate::InMemoryAttachmentStore::new()),
1107 crate::DirectCompletionClient::unavailable(
1108 "direct completions are unavailable in this test context",
1109 ),
1110 )
1111 .tool_call_id(tool_call_id)
1112 .build()
1113 }
1114
1115 #[test]
1116 fn tool_context_builder_carries_call_payload_and_cancellation_state() {
1117 let cancellation = tokio_util::sync::CancellationToken::new();
1118 let prepared = PreparedToolCall::from_parts(
1119 "call-1",
1120 "demo_tool",
1121 serde_json::json!({ "input": true }),
1122 None,
1123 serde_json::json!({ "prepared": true }),
1124 );
1125
1126 let context = ToolContext::builder(
1127 "session-1".to_string(),
1128 Arc::new(crate::testing::MockSessionManager::default()),
1129 Arc::new(crate::testing::MockSessionManager::default()),
1130 Arc::new(crate::testing::MockSessionManager::default()),
1131 Arc::new(crate::UnavailableProcessService),
1132 Arc::new(crate::DefaultProcessCancelAbility),
1133 crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1134 crate::InlineRuntimeEffectController,
1135 )),
1136 Arc::new(crate::InMemoryAttachmentStore::new()),
1137 crate::DirectCompletionClient::unavailable(
1138 "direct completions are unavailable in this test context",
1139 ),
1140 )
1141 .prepared_call(&prepared)
1142 .cancellation_token(Some(cancellation.clone()))
1143 .async_process("process-1", cancellation.clone())
1144 .build();
1145
1146 assert_eq!(context.session_id(), "session-1");
1147 assert_eq!(context.tool_call_id(), Some("call-1"));
1148 assert_eq!(
1149 context.prepared_payload(),
1150 &serde_json::json!({ "prepared": true })
1151 );
1152 assert_eq!(context.async_process_id(), Some("process-1"));
1153 assert!(context.cancellation_token().is_some());
1154 }
1155
1156 #[test]
1157 fn durable_effects_requires_prepared_call_id_and_supporting_controller() {
1158 let missing_call =
1159 test_context_with_controller(None, Arc::new(crate::InlineRuntimeEffectController));
1160 let err = match missing_call.durable_effects() {
1161 Ok(_) => panic!("missing prepared tool call id should fail"),
1162 Err(err) => err,
1163 };
1164 assert_eq!(err.code.as_str(), "durable_effects_missing_call_id");
1165
1166 let unsupported = test_context_with_controller(
1167 Some("call-1".to_string()),
1168 Arc::new(NoDurableEffectController),
1169 );
1170 let err = match unsupported.durable_effects() {
1171 Ok(_) => panic!("unsupported controller should fail"),
1172 Err(err) => err,
1173 };
1174 assert_eq!(err.code.as_str(), "durable_effects_unavailable");
1175 }
1176
1177 #[tokio::test]
1178 async fn durable_run_json_executes_and_maps_closure_errors() {
1179 let context = test_context_with_controller(
1180 Some("call-run-json".to_string()),
1181 Arc::new(crate::InlineRuntimeEffectController),
1182 );
1183 let durable = context.durable_effects().expect("durable effects");
1184 let value = durable
1185 .run_json(
1186 "create",
1187 serde_json::json!({ "x": 1 }),
1188 |input| async move { Ok(serde_json::json!({ "seen": input["x"] })) },
1189 )
1190 .await
1191 .expect("durable step");
1192 assert_eq!(value, serde_json::json!({ "seen": 1 }));
1193
1194 let err = durable
1195 .run_json("fail", serde_json::json!({}), |_| async {
1196 Err(crate::RuntimeError::new(
1197 "durable_step_failed",
1198 "step failed",
1199 ))
1200 })
1201 .await
1202 .expect_err("closure error");
1203 assert_eq!(err.code.as_str(), "durable_step_failed");
1204 assert_eq!(err.message, "step failed");
1205 }
1206
1207 #[tokio::test]
1208 async fn durable_run_json_rejects_empty_or_duplicate_step_ids_before_running() {
1209 let context = test_context_with_controller(
1210 Some("call-step-ids".to_string()),
1211 Arc::new(crate::InlineRuntimeEffectController),
1212 );
1213 let durable = context.durable_effects().expect("durable effects");
1214 let runs = Arc::new(AtomicU64::new(0));
1215
1216 let err = durable
1217 .run_json("", serde_json::Value::Null, {
1218 let runs = Arc::clone(&runs);
1219 move |_| async move {
1220 runs.fetch_add(1, Ordering::Relaxed);
1221 Ok(serde_json::Value::Null)
1222 }
1223 })
1224 .await
1225 .expect_err("empty step id");
1226 assert_eq!(err.code.as_str(), "durable_effect_empty_step_id");
1227 assert_eq!(runs.load(Ordering::Relaxed), 0);
1228
1229 durable
1230 .run_json("same", serde_json::Value::Null, {
1231 let runs = Arc::clone(&runs);
1232 move |_| async move {
1233 runs.fetch_add(1, Ordering::Relaxed);
1234 Ok(serde_json::Value::Null)
1235 }
1236 })
1237 .await
1238 .expect("first step");
1239 let err = durable
1240 .run_json("same", serde_json::Value::Null, {
1241 let runs = Arc::clone(&runs);
1242 move |_| async move {
1243 runs.fetch_add(1, Ordering::Relaxed);
1244 Ok(serde_json::Value::Null)
1245 }
1246 })
1247 .await
1248 .expect_err("duplicate step id");
1249 assert_eq!(err.code.as_str(), "durable_effect_duplicate_step_id");
1250 assert_eq!(runs.load(Ordering::Relaxed), 1);
1251 }
1252
1253 #[tokio::test]
1254 async fn durable_external_event_key_is_custom_and_stable() {
1255 let context = test_context_with_controller(
1256 Some("call-event-key".to_string()),
1257 Arc::new(crate::InlineRuntimeEffectController),
1258 );
1259 let durable = context.durable_effects().expect("durable effects");
1260 let first = durable
1261 .external_event_key("tool-event-stable")
1262 .await
1263 .expect("first key");
1264 let second = durable
1265 .external_event_key("tool-event-stable")
1266 .await
1267 .expect("second key");
1268
1269 assert_eq!(first, second);
1270 assert_eq!(
1271 first.wait,
1272 crate::AwaitEventWaitIdentity::Custom {
1273 key: "tool-event-stable".to_string()
1274 }
1275 );
1276 }
1277
1278 #[tokio::test]
1279 async fn durable_await_event_json_maps_terminal_resolutions() {
1280 let controller = Arc::new(crate::InlineRuntimeEffectController);
1281 let context =
1282 test_context_with_controller(Some("call-await-event".to_string()), controller.clone());
1283 let durable = context.durable_effects().expect("durable effects");
1284
1285 let ok_key = durable
1286 .external_event_key("tool-event-ok")
1287 .await
1288 .expect("ok key");
1289 controller
1290 .resolve_await_event(
1291 &ok_key,
1292 crate::Resolution::Ok(serde_json::json!({ "answer": 42 })),
1293 )
1294 .await
1295 .expect("resolve ok");
1296 let value = durable
1297 .await_event_json(ok_key)
1298 .await
1299 .expect("await ok value");
1300 assert_eq!(value, serde_json::json!({ "answer": 42 }));
1301
1302 let err_key = durable
1303 .external_event_key("tool-event-err")
1304 .await
1305 .expect("err key");
1306 controller
1307 .resolve_await_event(
1308 &err_key,
1309 crate::Resolution::Err(crate::ExternalCompletionError::new(
1310 "external_bad",
1311 "external failed",
1312 )),
1313 )
1314 .await
1315 .expect("resolve err");
1316 let err = durable
1317 .await_event_json(err_key)
1318 .await
1319 .expect_err("await err value");
1320 assert_eq!(err.code.as_str(), "external_bad");
1321
1322 let cancelled_key = durable
1323 .external_event_key("tool-event-cancelled")
1324 .await
1325 .expect("cancelled key");
1326 controller
1327 .resolve_await_event(&cancelled_key, crate::Resolution::Cancelled)
1328 .await
1329 .expect("resolve cancelled");
1330 let err = durable
1331 .await_event_json(cancelled_key)
1332 .await
1333 .expect_err("await cancelled value");
1334 assert_eq!(err.code.as_str(), "durable_effect_event_cancelled");
1335
1336 let timeout_key = durable
1337 .external_event_key("tool-event-timeout")
1338 .await
1339 .expect("timeout key");
1340 controller
1341 .resolve_await_event(&timeout_key, crate::Resolution::Timeout)
1342 .await
1343 .expect("resolve timeout");
1344 let err = durable
1345 .await_event_json(timeout_key)
1346 .await
1347 .expect_err("await timeout value");
1348 assert_eq!(err.code.as_str(), "durable_effect_event_timeout");
1349 }
1350
1351 #[tokio::test]
1352 async fn durable_emit_process_event_requires_process_and_appends_inside_process() {
1353 let context = test_context_with_controller(
1354 Some("call-no-process".to_string()),
1355 Arc::new(crate::InlineRuntimeEffectController),
1356 );
1357 let err = context
1358 .durable_effects()
1359 .expect("durable effects")
1360 .emit_process_event("tool.event", serde_json::json!({}))
1361 .await
1362 .expect_err("outside process");
1363 assert_eq!(
1364 err.code.as_str(),
1365 "durable_effect_process_event_unavailable"
1366 );
1367
1368 let registry = Arc::new(crate::TestLocalProcessRegistry::default());
1369 let process_id = "process:durable-tool-event";
1370 registry
1371 .register_process(
1372 crate::ProcessRegistration::new(
1373 process_id,
1374 crate::ProcessInput::External {
1375 metadata: serde_json::json!({}),
1376 },
1377 crate::ProcessProvenance::host(),
1378 )
1379 .with_extra_event_types([crate::ProcessEventType {
1380 name: "tool.event".to_string(),
1381 payload_schema: crate::LashSchema::any(),
1382 semantics: crate::ProcessEventSemanticsSpec::default(),
1383 }]),
1384 )
1385 .await
1386 .expect("register process");
1387 let registry_dyn: Arc<dyn crate::ProcessRegistry> = registry;
1388 let context = test_context_with_controller(
1389 Some("call-process-event".to_string()),
1390 Arc::new(crate::InlineRuntimeEffectController),
1391 )
1392 .with_process_events_for_testing(process_id, registry_dyn);
1393
1394 let event = context
1395 .durable_effects()
1396 .expect("durable effects")
1397 .emit_process_event("tool.event", serde_json::json!({ "ok": true }))
1398 .await
1399 .expect("process event");
1400 assert_eq!(event.process_id, process_id);
1401 assert_eq!(event.event_type, "tool.event");
1402 assert_eq!(event.payload, serde_json::json!({ "ok": true }));
1403 assert_eq!(
1404 event.invocation.replay_key(),
1405 Some("tool:call-process-event:durable-process-event:0")
1406 );
1407
1408 let append_err = context
1409 .durable_effects()
1410 .expect("durable effects")
1411 .emit_process_event("undeclared.event", serde_json::json!({}))
1412 .await
1413 .expect_err("undeclared event type must fail the append");
1414 assert_eq!(
1415 append_err.code.as_str(),
1416 "durable_effect_process_event_append_failed"
1417 );
1418 }
1419}