juncture_core/tools.rs
1// Tool system for agent function calling
2//
3// This module provides the tool abstraction and ToolNode implementation
4// for ReAct-style agent workflows.
5//
6// # Design Principles
7//
8// - Unified interface: Single trait for all tools
9// - Concurrent execution: Multiple tools can execute in parallel
10// - Error resilience: Failed tools return error messages to LLM
11// - Type-safe: Tool inputs/outputs use serde JSON for validation
12
13use async_trait::async_trait;
14use futures::future::BoxFuture;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use crate::config::RunnableConfig;
19use crate::llm::ToolDefinition;
20use crate::state::{Message, State};
21use crate::store::Store;
22
23/// Tool execution error types
24#[derive(Debug, Clone, thiserror::Error)]
25pub enum ToolError {
26 /// Invalid input to tool
27 #[error("invalid input: {0}")]
28 InvalidInput(String),
29
30 /// Tool execution failed
31 #[error("execution failed: {0}")]
32 ExecutionFailed(String),
33
34 /// Tool execution timeout
35 #[error("timeout")]
36 Timeout,
37
38 /// Tool not found
39 #[error("tool not found: {0}")]
40 ToolNotFound(String),
41
42 /// Validation error
43 #[error("validation error: {0}")]
44 ValidationError(String),
45}
46
47/// Unified tool trait
48///
49/// All tools must implement this trait to be used with `ToolNode`.
50///
51/// Note: This trait does not implement Debug as it's an async trait intended
52/// for dynamic dispatch via trait objects.
53#[async_trait]
54pub trait Tool: Send + Sync + 'static {
55 /// Get tool name
56 fn name(&self) -> &str;
57
58 /// Get tool description
59 fn description(&self) -> &str;
60
61 /// Get JSON Schema for tool parameters
62 fn schema(&self) -> serde_json::Value;
63
64 /// Get tool definition
65 fn definition(&self) -> ToolDefinition {
66 ToolDefinition {
67 name: self.name().to_string(),
68 description: self.description().to_string(),
69 parameters: self.schema(),
70 }
71 }
72
73 /// Execute the tool
74 ///
75 /// # Arguments
76 ///
77 /// * `input` - Tool input as JSON value (validated against schema)
78 ///
79 /// # Returns
80 ///
81 /// Tool output as string
82 async fn invoke(&self, input: serde_json::Value) -> Result<String, ToolError>;
83
84 /// Check if this tool requires Store access
85 ///
86 /// Tools that need persistent cross-thread storage should return `true`.
87 /// The default implementation returns `false`.
88 ///
89 /// # Returns
90 ///
91 /// `true` if the tool requires Store access, `false` otherwise
92 #[must_use]
93 fn requires_store(&self) -> bool {
94 false
95 }
96
97 /// Execute the tool with Store access
98 ///
99 /// This method is called instead of `invoke()` when `requires_store()` returns `true`.
100 /// The default implementation delegates to `invoke()`, ignoring the Store parameter.
101 ///
102 /// # Arguments
103 ///
104 /// * `input` - Tool input as JSON value (validated against schema)
105 /// * `store` - Store for cross-thread persistent data access
106 ///
107 /// # Returns
108 ///
109 /// Tool output as string
110 fn invoke_with_store<'a>(
111 &'a self,
112 input: serde_json::Value,
113 _store: &'a dyn crate::store::Store,
114 ) -> BoxFuture<'a, Result<String, ToolError>>
115 where
116 Self: 'a,
117 {
118 // Default implementation delegates to invoke() and ignores the store
119 Box::pin(async move {
120 let result = self.invoke(input).await?;
121 Ok(result)
122 })
123 }
124}
125
126/// Tool runtime context injected into tool execution
127///
128/// Provides access to graph state, configuration, and store during tool execution.
129#[allow(
130 missing_debug_implementations,
131 reason = "Contains dyn Store trait object which doesn't implement Debug"
132)]
133pub struct ToolRuntime<S: State> {
134 /// Current graph state (read-only snapshot)
135 pub state: S,
136 /// Current tool call ID
137 pub tool_call_id: String,
138 /// Runtime configuration
139 pub config: RunnableConfig,
140 /// Cross-thread persistent store
141 pub store: Option<Arc<dyn Store>>,
142 /// Streaming sender for tool output deltas
143 stream_tx: Option<tokio::sync::mpsc::UnboundedSender<serde_json::Value>>,
144 /// Optional sender for tool lifecycle streaming events
145 tools_event_tx: Option<tokio::sync::mpsc::UnboundedSender<crate::stream::ToolsEvent>>,
146}
147
148impl<S: State> ToolRuntime<S> {
149 /// Create a new `ToolRuntime` instance
150 ///
151 /// # Arguments
152 ///
153 /// * `state` - Current graph state
154 /// * `tool_call_id` - Tool call identifier
155 /// * `config` - Runtime configuration
156 /// * `store` - Optional cross-thread persistent store
157 /// * `stream_tx` - Optional streaming sender for output deltas
158 /// * `tools_event_tx` - Optional streaming sender for tool lifecycle events
159 #[must_use]
160 pub const fn new(
161 state: S,
162 tool_call_id: String,
163 config: RunnableConfig,
164 store: Option<Arc<dyn Store>>,
165 stream_tx: Option<tokio::sync::mpsc::UnboundedSender<serde_json::Value>>,
166 tools_event_tx: Option<tokio::sync::mpsc::UnboundedSender<crate::stream::ToolsEvent>>,
167 ) -> Self {
168 Self {
169 state,
170 tool_call_id,
171 config,
172 store,
173 stream_tx,
174 tools_event_tx,
175 }
176 }
177
178 /// Emit tool output delta for streaming
179 ///
180 /// Allows tools to stream intermediate results during execution.
181 /// If no streaming channel is configured, this is a no-op.
182 ///
183 /// # Arguments
184 ///
185 /// * `delta` - Output delta fragment to stream
186 pub fn emit_output_delta(&self, delta: &str) {
187 if let Some(ref tx) = self.stream_tx {
188 let _ = tx.send(serde_json::json!({
189 "delta": delta,
190 "tool_call_id": self.tool_call_id
191 }));
192 }
193 }
194
195 /// Emit tool started lifecycle event
196 ///
197 /// Sends a [`ToolsEvent::ToolStarted`] through the tools event channel
198 /// when one is configured. If no channel is available, this is a no-op.
199 ///
200 /// # Arguments
201 ///
202 /// * `tool_name` - Name of the tool being started
203 /// * `node` - Node name where the tool is executing
204 /// * `input` - Tool input as JSON value
205 pub fn emit_tool_started(&self, tool_name: &str, node: &str, input: serde_json::Value) {
206 if let Some(ref tx) = self.tools_event_tx {
207 let event = crate::stream::ToolsEvent::ToolStarted {
208 tool_name: tool_name.to_string(),
209 tool_call_id: self.tool_call_id.clone(),
210 node: node.to_string(),
211 input,
212 timestamp: chrono::Utc::now(),
213 };
214 let _ = tx.send(event);
215 }
216 }
217
218 /// Emit tool finished lifecycle event
219 ///
220 /// Sends a [`ToolsEvent::ToolFinished`] through the tools event channel
221 /// when one is configured. If no channel is available, this is a no-op.
222 ///
223 /// # Arguments
224 ///
225 /// * `output` - Tool output as JSON value
226 /// * `duration_ms` - Execution duration in milliseconds
227 /// * `success` - Whether the tool execution succeeded
228 pub fn emit_tool_finished(&self, output: serde_json::Value, duration_ms: u64, success: bool) {
229 if let Some(ref tx) = self.tools_event_tx {
230 let event = crate::stream::ToolsEvent::ToolFinished {
231 tool_call_id: self.tool_call_id.clone(),
232 output,
233 duration_ms,
234 success,
235 };
236 let _ = tx.send(event);
237 }
238 }
239}
240
241/// Stateful tool trait for tools that need graph state access
242///
243/// Tools can access the current graph state during execution.
244///
245/// Note: This trait does not implement Debug as it's an async trait intended
246/// for dynamic dispatch via trait objects.
247#[async_trait]
248pub trait StatefulTool<S: State>: Tool {
249 /// Execute with state access
250 ///
251 /// # Arguments
252 ///
253 /// * `input` - Tool input
254 /// * `runtime` - Runtime context with state access
255 ///
256 /// # Returns
257 ///
258 /// Tool output as string
259 fn invoke_with_state(
260 &self,
261 input: serde_json::Value,
262 runtime: &ToolRuntime<S>,
263 ) -> BoxFuture<'_, Result<String, ToolError>>;
264
265 /// Override `invoke_with_store` to use state access when available
266 ///
267 /// This default implementation calls the base Tool trait's `invoke_with_store`.
268 /// Tools that need both state and store access can override this method.
269 ///
270 /// # Arguments
271 ///
272 /// * `input` - Tool input
273 /// * `store` - Store for cross-thread data access
274 ///
275 /// # Returns
276 ///
277 /// Tool output as string
278 fn invoke_with_store<'a>(
279 &'a self,
280 input: serde_json::Value,
281 store: &'a dyn crate::store::Store,
282 ) -> BoxFuture<'a, Result<String, ToolError>>
283 where
284 Self: 'a,
285 {
286 // Delegate to the base Tool trait implementation
287 Tool::invoke_with_store(self, input, store)
288 }
289}
290
291/// Tool call interceptor trait
292///
293/// Allows injecting custom logic before and after tool execution.
294///
295/// Note: This trait does not implement Debug as it's an async trait intended
296/// for dynamic dispatch via trait objects.
297#[async_trait]
298pub trait ToolInterceptor: Send + Sync + 'static {
299 /// Called before tool execution
300 ///
301 /// Return Err to cancel tool execution with error message.
302 fn pre_execute(
303 &self,
304 tool_call: &crate::state::ToolCall,
305 state: &serde_json::Value,
306 ) -> BoxFuture<'_, Result<(), ToolError>>;
307
308 /// Called after tool execution
309 ///
310 /// Can modify the tool result.
311 fn post_execute(
312 &self,
313 tool_call: &crate::state::ToolCall,
314 result: &Result<String, ToolError>,
315 ) -> BoxFuture<'_, Result<String, ToolError>>;
316}
317
318/// No-op interceptor (default implementation)
319#[derive(Debug)]
320pub struct NopToolInterceptor;
321
322#[async_trait]
323impl ToolInterceptor for NopToolInterceptor {
324 fn pre_execute(
325 &self,
326 _tool_call: &crate::state::ToolCall,
327 _state: &serde_json::Value,
328 ) -> BoxFuture<'_, Result<(), ToolError>> {
329 Box::pin(async { Ok(()) })
330 }
331
332 fn post_execute(
333 &self,
334 _tool_call: &crate::state::ToolCall,
335 result: &Result<String, ToolError>,
336 ) -> BoxFuture<'_, Result<String, ToolError>> {
337 let result_clone = result.clone();
338 Box::pin(async move { result_clone.map_err(|e| ToolError::ExecutionFailed(e.to_string())) })
339 }
340}
341
342/// Tool call transformer trait
343///
344/// Allows transforming tool call parameters before execution.
345///
346/// Note: This trait does not implement Debug as it's intended for dynamic
347/// dispatch via trait objects.
348pub trait ToolCallTransformer: Send + Sync + 'static {
349 /// Transform the tool call
350 ///
351 /// # Errors
352 ///
353 /// Returns `ToolError` if the transformation fails.
354 fn transform(&self, tool_call: &mut crate::state::ToolCall) -> Result<(), ToolError>;
355}
356
357/// Tool node configuration
358#[allow(
359 missing_debug_implementations,
360 clippy::type_complexity,
361 reason = "Contains trait objects and Arc<dyn Fn> which don't implement Debug. Complex trait object type is required for dynamic tool configuration."
362)]
363pub struct ToolNodeConfig {
364 /// List of tools
365 pub tools: Vec<Box<dyn Tool>>,
366 /// Handle errors by returning them to LLM (true) or failing (false)
367 pub handle_errors: bool,
368 /// Validate tool inputs against schema before execution
369 pub validate_input: bool,
370 /// Optional tool call transformer
371 pub call_transformer: Option<Box<dyn ToolCallTransformer>>,
372 /// Optional tool call interceptor
373 pub interceptor: Option<Arc<dyn ToolInterceptor>>,
374 /// Optional tools condition function
375 pub tools_condition: Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>,
376}
377
378impl Default for ToolNodeConfig {
379 fn default() -> Self {
380 Self {
381 tools: vec![],
382 handle_errors: true,
383 validate_input: false,
384 call_transformer: None,
385 interceptor: None,
386 tools_condition: None,
387 }
388 }
389}
390
391/// Tool node for executing function calls
392///
393/// Extracts tool calls from the last AI message and executes them.
394#[allow(
395 missing_debug_implementations,
396 reason = "Contains trait objects which don't implement Debug"
397)]
398pub struct ToolNode {
399 /// Tool registry
400 #[expect(dead_code, reason = "Used in tool execution")]
401 tools: HashMap<String, Box<dyn Tool>>,
402 /// Error handling mode
403 handle_errors: bool,
404 /// Input validation
405 validate_input: bool,
406 /// Optional tool call transformer
407 call_transformer: Option<Box<dyn ToolCallTransformer>>,
408 /// Optional interceptor
409 interceptor: Option<Arc<dyn ToolInterceptor>>,
410}
411
412impl ToolNode {
413 /// Create new tool node from tools
414 #[must_use]
415 pub fn new(tools: Vec<Box<dyn Tool>>) -> Self {
416 let tool_map = tools
417 .into_iter()
418 .map(|t| (t.name().to_string(), t))
419 .collect();
420
421 Self {
422 tools: tool_map,
423 handle_errors: true,
424 validate_input: false,
425 call_transformer: None,
426 interceptor: None,
427 }
428 }
429
430 /// Create tool node from config
431 #[must_use]
432 pub fn from_config(config: ToolNodeConfig) -> Self {
433 let tool_map = config
434 .tools
435 .into_iter()
436 .map(|t| (t.name().to_string(), t))
437 .collect();
438
439 Self {
440 tools: tool_map,
441 handle_errors: config.handle_errors,
442 validate_input: config.validate_input,
443 call_transformer: config.call_transformer,
444 interceptor: config.interceptor,
445 }
446 }
447
448 /// Set error handling mode
449 #[must_use]
450 pub const fn with_error_handling(mut self, handle: bool) -> Self {
451 self.handle_errors = handle;
452 self
453 }
454
455 /// Enable input validation
456 #[must_use]
457 pub const fn with_validation(mut self, validate: bool) -> Self {
458 self.validate_input = validate;
459 self
460 }
461
462 /// Set tool call transformer
463 #[must_use]
464 pub fn with_transformer(mut self, transformer: Box<dyn ToolCallTransformer>) -> Self {
465 self.call_transformer = Some(transformer);
466 self
467 }
468
469 /// Set interceptor
470 #[must_use]
471 pub fn with_interceptor(mut self, interceptor: Arc<dyn ToolInterceptor>) -> Self {
472 self.interceptor = Some(interceptor);
473 self
474 }
475}
476
477/// Tool execution trace record
478#[derive(Debug, Clone)]
479pub struct ToolExecutionTrace {
480 /// Tool name
481 pub tool_name: String,
482 /// Tool call ID
483 pub tool_call_id: String,
484 /// Attempt number
485 pub attempt: usize,
486 /// First attempt timestamp
487 pub first_attempt_time: chrono::DateTime<chrono::Utc>,
488 /// Execution duration in milliseconds
489 pub duration_ms: u64,
490 /// Success flag
491 pub success: bool,
492}
493
494/// Validate tool input against schema
495#[expect(dead_code, reason = "Used in tool execution validation")]
496fn validate_tool_input(tool: &dyn Tool, input: &serde_json::Value) -> Result<(), ToolError> {
497 let schema = tool.schema();
498
499 // Basic JSON schema validation
500 if let Some(obj) = input.as_object()
501 && let Some(schema_obj) = schema.as_object()
502 && let Some(required) = schema_obj.get("required").and_then(|v| v.as_array())
503 {
504 for field in required {
505 if let Some(field_name) = field.as_str()
506 && !obj.contains_key(field_name)
507 {
508 return Err(ToolError::ValidationError(format!(
509 "Missing required field: {field_name}",
510 )));
511 }
512 }
513 }
514
515 Ok(())
516}
517
518/// Tools condition router function
519///
520/// Standard routing function for `ReAct` agents.
521/// Routes to "tools" node if last message has `tool_calls`, otherwise to END.
522///
523/// # Arguments
524///
525/// * `state` - Graph state
526/// * `messages_field` - Name of messages field in state
527///
528/// # Returns
529///
530/// Target node name ("tools" or END)
531///
532/// # Examples
533///
534/// ```ignore
535/// graph.add_conditional_edges(
536/// "agent",
537/// |state: &MyState| tools_condition(state, "messages"),
538/// path_map! {
539/// "tools" => "tools",
540/// END => END,
541/// },
542/// );
543/// ```
544pub fn tools_condition<S: State + serde::Serialize>(
545 state: &S,
546 messages_field: &str,
547) -> &'static str {
548 if has_pending_tool_calls(state, messages_field) {
549 "tools"
550 } else {
551 crate::END
552 }
553}
554
555/// Check if the last AI message in state has pending tool calls.
556///
557/// Serializes the state to JSON, extracts the messages array from the named
558/// field, and checks whether the last message with role `Ai` has non-empty
559/// `tool_calls`. The role comparison uses the serde-serialized form
560/// `"Ai"` (the variant name, since [`Role::Ai`](crate::state::messages::Role::Ai)
561/// has no serde rename in the current implementation).
562fn has_pending_tool_calls<S: serde::Serialize>(state: &S, messages_field: &str) -> bool {
563 let Ok(value) = serde_json::to_value(state) else {
564 return false;
565 };
566
567 let Some(messages) = value.get(messages_field).and_then(|v| v.as_array()) else {
568 return false;
569 };
570
571 // Walk backwards to find the last AI message
572 for msg in messages.iter().rev() {
573 let role = msg.get("role").and_then(|v| v.as_str()).unwrap_or("");
574 if role == "Ai" {
575 return msg
576 .get("tool_calls")
577 .and_then(|v| v.as_array())
578 .is_some_and(|arr| !arr.is_empty());
579 }
580 }
581
582 false
583}
584
585// Rust guideline compliant 2026-05-20