Skip to main content

juncture_core/
tools.rs

1// Tool system for agent function calling
2//
3// This module provides the tool abstraction and ToolNode implementation
4// for ReAct-style agent workflows.
5//
6// # Design Principles
7//
8// - Unified interface: Single trait for all tools
9// - Concurrent execution: Multiple tools can execute in parallel
10// - Error resilience: Failed tools return error messages to LLM
11// - Type-safe: Tool inputs/outputs use serde JSON for validation
12
13use async_trait::async_trait;
14use futures::future::BoxFuture;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use crate::config::RunnableConfig;
19use crate::llm::ToolDefinition;
20use crate::state::{Message, State};
21use crate::store::Store;
22
23/// Tool execution error types
24#[derive(Debug, Clone, thiserror::Error)]
25pub enum ToolError {
26    /// Invalid input to tool
27    #[error("invalid input: {0}")]
28    InvalidInput(String),
29
30    /// Tool execution failed
31    #[error("execution failed: {0}")]
32    ExecutionFailed(String),
33
34    /// Tool execution timeout
35    #[error("timeout")]
36    Timeout,
37
38    /// Tool not found
39    #[error("tool not found: {0}")]
40    ToolNotFound(String),
41
42    /// Validation error
43    #[error("validation error: {0}")]
44    ValidationError(String),
45}
46
47/// Unified tool trait
48///
49/// All tools must implement this trait to be used with `ToolNode`.
50///
51/// Note: This trait does not implement Debug as it's an async trait intended
52/// for dynamic dispatch via trait objects.
53#[async_trait]
54pub trait Tool: Send + Sync + 'static {
55    /// Get tool name
56    fn name(&self) -> &str;
57
58    /// Get tool description
59    fn description(&self) -> &str;
60
61    /// Get JSON Schema for tool parameters
62    fn schema(&self) -> serde_json::Value;
63
64    /// Get tool definition
65    fn definition(&self) -> ToolDefinition {
66        ToolDefinition {
67            name: self.name().to_string(),
68            description: self.description().to_string(),
69            parameters: self.schema(),
70        }
71    }
72
73    /// Execute the tool
74    ///
75    /// # Arguments
76    ///
77    /// * `input` - Tool input as JSON value (validated against schema)
78    ///
79    /// # Returns
80    ///
81    /// Tool output as string
82    async fn invoke(&self, input: serde_json::Value) -> Result<String, ToolError>;
83
84    /// Check if this tool requires Store access
85    ///
86    /// Tools that need persistent cross-thread storage should return `true`.
87    /// The default implementation returns `false`.
88    ///
89    /// # Returns
90    ///
91    /// `true` if the tool requires Store access, `false` otherwise
92    #[must_use]
93    fn requires_store(&self) -> bool {
94        false
95    }
96
97    /// Execute the tool with Store access
98    ///
99    /// This method is called instead of `invoke()` when `requires_store()` returns `true`.
100    /// The default implementation delegates to `invoke()`, ignoring the Store parameter.
101    ///
102    /// # Arguments
103    ///
104    /// * `input` - Tool input as JSON value (validated against schema)
105    /// * `store` - Store for cross-thread persistent data access
106    ///
107    /// # Returns
108    ///
109    /// Tool output as string
110    fn invoke_with_store<'a>(
111        &'a self,
112        input: serde_json::Value,
113        _store: &'a dyn crate::store::Store,
114    ) -> BoxFuture<'a, Result<String, ToolError>>
115    where
116        Self: 'a,
117    {
118        // Default implementation delegates to invoke() and ignores the store
119        Box::pin(async move {
120            let result = self.invoke(input).await?;
121            Ok(result)
122        })
123    }
124}
125
126/// Tool runtime context injected into tool execution
127///
128/// Provides access to graph state, configuration, and store during tool execution.
129#[allow(
130    missing_debug_implementations,
131    reason = "Contains dyn Store trait object which doesn't implement Debug"
132)]
133pub struct ToolRuntime<S: State> {
134    /// Current graph state (read-only snapshot)
135    pub state: S,
136    /// Current tool call ID
137    pub tool_call_id: String,
138    /// Runtime configuration
139    pub config: RunnableConfig,
140    /// Cross-thread persistent store
141    pub store: Option<Arc<dyn Store>>,
142    /// Streaming sender for tool output deltas
143    stream_tx: Option<tokio::sync::mpsc::UnboundedSender<serde_json::Value>>,
144    /// Optional sender for tool lifecycle streaming events
145    tools_event_tx: Option<tokio::sync::mpsc::UnboundedSender<crate::stream::ToolsEvent>>,
146}
147
148impl<S: State> ToolRuntime<S> {
149    /// Create a new `ToolRuntime` instance
150    ///
151    /// # Arguments
152    ///
153    /// * `state` - Current graph state
154    /// * `tool_call_id` - Tool call identifier
155    /// * `config` - Runtime configuration
156    /// * `store` - Optional cross-thread persistent store
157    /// * `stream_tx` - Optional streaming sender for output deltas
158    /// * `tools_event_tx` - Optional streaming sender for tool lifecycle events
159    #[must_use]
160    pub const fn new(
161        state: S,
162        tool_call_id: String,
163        config: RunnableConfig,
164        store: Option<Arc<dyn Store>>,
165        stream_tx: Option<tokio::sync::mpsc::UnboundedSender<serde_json::Value>>,
166        tools_event_tx: Option<tokio::sync::mpsc::UnboundedSender<crate::stream::ToolsEvent>>,
167    ) -> Self {
168        Self {
169            state,
170            tool_call_id,
171            config,
172            store,
173            stream_tx,
174            tools_event_tx,
175        }
176    }
177
178    /// Emit tool output delta for streaming
179    ///
180    /// Allows tools to stream intermediate results during execution.
181    /// If no streaming channel is configured, this is a no-op.
182    ///
183    /// # Arguments
184    ///
185    /// * `delta` - Output delta fragment to stream
186    pub fn emit_output_delta(&self, delta: &str) {
187        if let Some(ref tx) = self.stream_tx {
188            let _ = tx.send(serde_json::json!({
189                "delta": delta,
190                "tool_call_id": self.tool_call_id
191            }));
192        }
193    }
194
195    /// Emit tool started lifecycle event
196    ///
197    /// Sends a [`ToolsEvent::ToolStarted`] through the tools event channel
198    /// when one is configured. If no channel is available, this is a no-op.
199    ///
200    /// # Arguments
201    ///
202    /// * `tool_name` - Name of the tool being started
203    /// * `node` - Node name where the tool is executing
204    /// * `input` - Tool input as JSON value
205    pub fn emit_tool_started(&self, tool_name: &str, node: &str, input: serde_json::Value) {
206        if let Some(ref tx) = self.tools_event_tx {
207            let event = crate::stream::ToolsEvent::ToolStarted {
208                tool_name: tool_name.to_string(),
209                tool_call_id: self.tool_call_id.clone(),
210                node: node.to_string(),
211                input,
212                timestamp: chrono::Utc::now(),
213            };
214            let _ = tx.send(event);
215        }
216    }
217
218    /// Emit tool finished lifecycle event
219    ///
220    /// Sends a [`ToolsEvent::ToolFinished`] through the tools event channel
221    /// when one is configured. If no channel is available, this is a no-op.
222    ///
223    /// # Arguments
224    ///
225    /// * `output` - Tool output as JSON value
226    /// * `duration_ms` - Execution duration in milliseconds
227    /// * `success` - Whether the tool execution succeeded
228    pub fn emit_tool_finished(&self, output: serde_json::Value, duration_ms: u64, success: bool) {
229        if let Some(ref tx) = self.tools_event_tx {
230            let event = crate::stream::ToolsEvent::ToolFinished {
231                tool_call_id: self.tool_call_id.clone(),
232                output,
233                duration_ms,
234                success,
235            };
236            let _ = tx.send(event);
237        }
238    }
239}
240
241/// Stateful tool trait for tools that need graph state access
242///
243/// Tools can access the current graph state during execution.
244///
245/// Note: This trait does not implement Debug as it's an async trait intended
246/// for dynamic dispatch via trait objects.
247#[async_trait]
248pub trait StatefulTool<S: State>: Tool {
249    /// Execute with state access
250    ///
251    /// # Arguments
252    ///
253    /// * `input` - Tool input
254    /// * `runtime` - Runtime context with state access
255    ///
256    /// # Returns
257    ///
258    /// Tool output as string
259    fn invoke_with_state(
260        &self,
261        input: serde_json::Value,
262        runtime: &ToolRuntime<S>,
263    ) -> BoxFuture<'_, Result<String, ToolError>>;
264
265    /// Override `invoke_with_store` to use state access when available
266    ///
267    /// This default implementation calls the base Tool trait's `invoke_with_store`.
268    /// Tools that need both state and store access can override this method.
269    ///
270    /// # Arguments
271    ///
272    /// * `input` - Tool input
273    /// * `store` - Store for cross-thread data access
274    ///
275    /// # Returns
276    ///
277    /// Tool output as string
278    fn invoke_with_store<'a>(
279        &'a self,
280        input: serde_json::Value,
281        store: &'a dyn crate::store::Store,
282    ) -> BoxFuture<'a, Result<String, ToolError>>
283    where
284        Self: 'a,
285    {
286        // Delegate to the base Tool trait implementation
287        Tool::invoke_with_store(self, input, store)
288    }
289}
290
291/// Tool call interceptor trait
292///
293/// Allows injecting custom logic before and after tool execution.
294///
295/// Note: This trait does not implement Debug as it's an async trait intended
296/// for dynamic dispatch via trait objects.
297#[async_trait]
298pub trait ToolInterceptor: Send + Sync + 'static {
299    /// Called before tool execution
300    ///
301    /// Return Err to cancel tool execution with error message.
302    fn pre_execute(
303        &self,
304        tool_call: &crate::state::ToolCall,
305        state: &serde_json::Value,
306    ) -> BoxFuture<'_, Result<(), ToolError>>;
307
308    /// Called after tool execution
309    ///
310    /// Can modify the tool result.
311    fn post_execute(
312        &self,
313        tool_call: &crate::state::ToolCall,
314        result: &Result<String, ToolError>,
315    ) -> BoxFuture<'_, Result<String, ToolError>>;
316}
317
318/// No-op interceptor (default implementation)
319#[derive(Debug)]
320pub struct NopToolInterceptor;
321
322#[async_trait]
323impl ToolInterceptor for NopToolInterceptor {
324    fn pre_execute(
325        &self,
326        _tool_call: &crate::state::ToolCall,
327        _state: &serde_json::Value,
328    ) -> BoxFuture<'_, Result<(), ToolError>> {
329        Box::pin(async { Ok(()) })
330    }
331
332    fn post_execute(
333        &self,
334        _tool_call: &crate::state::ToolCall,
335        result: &Result<String, ToolError>,
336    ) -> BoxFuture<'_, Result<String, ToolError>> {
337        let result_clone = result.clone();
338        Box::pin(async move { result_clone.map_err(|e| ToolError::ExecutionFailed(e.to_string())) })
339    }
340}
341
342/// Tool call transformer trait
343///
344/// Allows transforming tool call parameters before execution.
345///
346/// Note: This trait does not implement Debug as it's intended for dynamic
347/// dispatch via trait objects.
348pub trait ToolCallTransformer: Send + Sync + 'static {
349    /// Transform the tool call
350    ///
351    /// # Errors
352    ///
353    /// Returns `ToolError` if the transformation fails.
354    fn transform(&self, tool_call: &mut crate::state::ToolCall) -> Result<(), ToolError>;
355}
356
357/// Tool node configuration
358#[allow(
359    missing_debug_implementations,
360    clippy::type_complexity,
361    reason = "Contains trait objects and Arc<dyn Fn> which don't implement Debug. Complex trait object type is required for dynamic tool configuration."
362)]
363pub struct ToolNodeConfig {
364    /// List of tools
365    pub tools: Vec<Box<dyn Tool>>,
366    /// Handle errors by returning them to LLM (true) or failing (false)
367    pub handle_errors: bool,
368    /// Validate tool inputs against schema before execution
369    pub validate_input: bool,
370    /// Optional tool call transformer
371    pub call_transformer: Option<Box<dyn ToolCallTransformer>>,
372    /// Optional tool call interceptor
373    pub interceptor: Option<Arc<dyn ToolInterceptor>>,
374    /// Optional tools condition function
375    pub tools_condition: Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>,
376}
377
378impl Default for ToolNodeConfig {
379    fn default() -> Self {
380        Self {
381            tools: vec![],
382            handle_errors: true,
383            validate_input: false,
384            call_transformer: None,
385            interceptor: None,
386            tools_condition: None,
387        }
388    }
389}
390
391/// Tool node for executing function calls
392///
393/// Extracts tool calls from the last AI message and executes them.
394#[allow(
395    missing_debug_implementations,
396    reason = "Contains trait objects which don't implement Debug"
397)]
398pub struct ToolNode {
399    /// Tool registry
400    #[expect(dead_code, reason = "Used in tool execution")]
401    tools: HashMap<String, Box<dyn Tool>>,
402    /// Error handling mode
403    handle_errors: bool,
404    /// Input validation
405    validate_input: bool,
406    /// Optional tool call transformer
407    call_transformer: Option<Box<dyn ToolCallTransformer>>,
408    /// Optional interceptor
409    interceptor: Option<Arc<dyn ToolInterceptor>>,
410}
411
412impl ToolNode {
413    /// Create new tool node from tools
414    #[must_use]
415    pub fn new(tools: Vec<Box<dyn Tool>>) -> Self {
416        let tool_map = tools
417            .into_iter()
418            .map(|t| (t.name().to_string(), t))
419            .collect();
420
421        Self {
422            tools: tool_map,
423            handle_errors: true,
424            validate_input: false,
425            call_transformer: None,
426            interceptor: None,
427        }
428    }
429
430    /// Create tool node from config
431    #[must_use]
432    pub fn from_config(config: ToolNodeConfig) -> Self {
433        let tool_map = config
434            .tools
435            .into_iter()
436            .map(|t| (t.name().to_string(), t))
437            .collect();
438
439        Self {
440            tools: tool_map,
441            handle_errors: config.handle_errors,
442            validate_input: config.validate_input,
443            call_transformer: config.call_transformer,
444            interceptor: config.interceptor,
445        }
446    }
447
448    /// Set error handling mode
449    #[must_use]
450    pub const fn with_error_handling(mut self, handle: bool) -> Self {
451        self.handle_errors = handle;
452        self
453    }
454
455    /// Enable input validation
456    #[must_use]
457    pub const fn with_validation(mut self, validate: bool) -> Self {
458        self.validate_input = validate;
459        self
460    }
461
462    /// Set tool call transformer
463    #[must_use]
464    pub fn with_transformer(mut self, transformer: Box<dyn ToolCallTransformer>) -> Self {
465        self.call_transformer = Some(transformer);
466        self
467    }
468
469    /// Set interceptor
470    #[must_use]
471    pub fn with_interceptor(mut self, interceptor: Arc<dyn ToolInterceptor>) -> Self {
472        self.interceptor = Some(interceptor);
473        self
474    }
475}
476
477/// Tool execution trace record
478#[derive(Debug, Clone)]
479pub struct ToolExecutionTrace {
480    /// Tool name
481    pub tool_name: String,
482    /// Tool call ID
483    pub tool_call_id: String,
484    /// Attempt number
485    pub attempt: usize,
486    /// First attempt timestamp
487    pub first_attempt_time: chrono::DateTime<chrono::Utc>,
488    /// Execution duration in milliseconds
489    pub duration_ms: u64,
490    /// Success flag
491    pub success: bool,
492}
493
494/// Validate tool input against schema
495#[expect(dead_code, reason = "Used in tool execution validation")]
496fn validate_tool_input(tool: &dyn Tool, input: &serde_json::Value) -> Result<(), ToolError> {
497    let schema = tool.schema();
498
499    // Basic JSON schema validation
500    if let Some(obj) = input.as_object()
501        && let Some(schema_obj) = schema.as_object()
502        && let Some(required) = schema_obj.get("required").and_then(|v| v.as_array())
503    {
504        for field in required {
505            if let Some(field_name) = field.as_str()
506                && !obj.contains_key(field_name)
507            {
508                return Err(ToolError::ValidationError(format!(
509                    "Missing required field: {field_name}",
510                )));
511            }
512        }
513    }
514
515    Ok(())
516}
517
518/// Tools condition router function
519///
520/// Standard routing function for `ReAct` agents.
521/// Routes to "tools" node if last message has `tool_calls`, otherwise to END.
522///
523/// # Arguments
524///
525/// * `state` - Graph state
526/// * `messages_field` - Name of messages field in state
527///
528/// # Returns
529///
530/// Target node name ("tools" or END)
531///
532/// # Examples
533///
534/// ```ignore
535/// graph.add_conditional_edges(
536///     "agent",
537///     |state: &MyState| tools_condition(state, "messages"),
538///     path_map! {
539///         "tools" => "tools",
540///         END => END,
541///     },
542/// );
543/// ```
544pub fn tools_condition<S: State + serde::Serialize>(
545    state: &S,
546    messages_field: &str,
547) -> &'static str {
548    if has_pending_tool_calls(state, messages_field) {
549        "tools"
550    } else {
551        crate::END
552    }
553}
554
555/// Check if the last AI message in state has pending tool calls.
556///
557/// Serializes the state to JSON, extracts the messages array from the named
558/// field, and checks whether the last message with role `Ai` has non-empty
559/// `tool_calls`. The role comparison uses the serde-serialized form
560/// `"Ai"` (the variant name, since [`Role::Ai`](crate::state::messages::Role::Ai)
561/// has no serde rename in the current implementation).
562fn has_pending_tool_calls<S: serde::Serialize>(state: &S, messages_field: &str) -> bool {
563    let Ok(value) = serde_json::to_value(state) else {
564        return false;
565    };
566
567    let Some(messages) = value.get(messages_field).and_then(|v| v.as_array()) else {
568        return false;
569    };
570
571    // Walk backwards to find the last AI message
572    for msg in messages.iter().rev() {
573        let role = msg.get("role").and_then(|v| v.as_str()).unwrap_or("");
574        if role == "Ai" {
575            return msg
576                .get("tool_calls")
577                .and_then(|v| v.as_array())
578                .is_some_and(|arr| !arr.is_empty());
579        }
580    }
581
582    false
583}
584
585// Rust guideline compliant 2026-05-20