Skip to main content

neuron_turn_kit/
lib.rs

1//! neuron-turn-kit
2//!
3//! Planning and execution primitives for a single agent turn — focused on
4//! sequencing and concurrency only. These traits are intentionally narrow and
5//! do not bake in provider streaming, hooks, or operator concerns; those live
6//! at higher layers (e.g., operator implementations).
7//!
8//! Contents:
9//! - Concurrency and ConcurrencyDecider — classify tool calls as Shared vs Exclusive
10//! - ToolExecutionPlanner and BarrierPlanner — sequence tool calls into batches
11//! - SteeringSource — optional source of mid-loop steering messages
12//! - BatchExecutor — run batches, with a simple sequential baseline executor
13//!
14//! Example: planning with a barrier.
15//! ```rust
16//! use neuron_turn_kit::{BarrierPlanner, Concurrency, ConcurrencyDecider, ToolExecutionPlanner};
17//!
18//! struct SharedIfStartsWith;
19//! impl ConcurrencyDecider for SharedIfStartsWith {
20//!     fn concurrency(&self, tool_name: &str) -> Concurrency {
21//!         if tool_name.starts_with("shared_") { Concurrency::Shared } else { Concurrency::Exclusive }
22//!     }
23//! }
24//!
25//! let calls = vec![
26//!     ("1".to_string(), "shared_a".to_string(), serde_json::json!({})),
27//!     ("2".to_string(), "exclusive".to_string(), serde_json::json!({})),
28//!     ("3".to_string(), "shared_b".to_string(), serde_json::json!({})),
29//! ];
30//! let planner = BarrierPlanner;
31//! let plan = planner.plan(&calls, &SharedIfStartsWith);
32//! assert!(matches!(plan[0], neuron_turn_kit::BatchItem::Shared(_)));
33//! ```
34
35use serde_json::Value;
36
37/// Concurrency hint for tool scheduling (strategy-defined).
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
39pub enum Concurrency {
40    /// Safe to run alongside other shared tools in the same batch.
41    Shared,
42    /// Must run alone (barrier before and after).
43    Exclusive,
44}
45
46/// Decide concurrency for a tool by name.
47pub trait ConcurrencyDecider: Send + Sync {
48    /// Return the concurrency class for a tool by name.
49    fn concurrency(&self, tool_name: &str) -> Concurrency;
50}
51
52/// Optional source of steering messages to inject mid-loop (provider-formatted).
53///
54/// Boundary: this is not a "hook" — it does not inspect internal state or
55/// events. It is a narrow bridge for external steering signals only.
56pub trait SteeringSource: Send + Sync {
57    /// Drain any available provider-formatted messages to inject.
58    fn drain(&self) -> Vec<neuron_turn::types::ProviderMessage>;
59}
60
61/// Planned batches for a turn.
62#[derive(Debug, Clone)]
63pub enum BatchItem {
64    /// Batch of tools that may execute in parallel (shared).
65    /// Each entry is (id, name, input_json).
66    Shared(Vec<(String, String, Value)>),
67    /// A single tool that must execute alone (exclusive).
68    Exclusive((String, String, Value)),
69}
70
71/// Plan how to execute tool calls this turn (sequencing only).
72pub trait ToolExecutionPlanner: Send + Sync {
73    /// Plan execution batches from an ordered list of tool calls. The planner
74    /// must preserve relative order of application and introduce parallelism
75    /// only for Shared batches. The decider classifies each tool.
76    fn plan(
77        &self,
78        tool_uses: &[(String, String, Value)],
79        decider: &dyn ConcurrencyDecider,
80    ) -> Vec<BatchItem>;
81}
82
83/// Barrier planner: batches shared tools; flushes on exclusive.
84pub struct BarrierPlanner;
85impl ToolExecutionPlanner for BarrierPlanner {
86    fn plan(
87        &self,
88        tool_uses: &[(String, String, Value)],
89        decider: &dyn ConcurrencyDecider,
90    ) -> Vec<BatchItem> {
91        let mut out = Vec::new();
92        let mut pending_shared: Vec<(String, String, Value)> = Vec::new();
93        for (id, name, input) in tool_uses.iter().cloned() {
94            match decider.concurrency(&name) {
95                Concurrency::Shared => pending_shared.push((id, name, input)),
96                Concurrency::Exclusive => {
97                    if !pending_shared.is_empty() {
98                        out.push(BatchItem::Shared(std::mem::take(&mut pending_shared)));
99                    }
100                    out.push(BatchItem::Exclusive((id, name, input)));
101                }
102            }
103        }
104        if !pending_shared.is_empty() {
105            out.push(BatchItem::Shared(pending_shared));
106        }
107        out
108    }
109}
110
111/// Contract for executing planned batches.
112///
113/// Narrow by design: this concerns only execution of tool invocations with
114/// concurrency semantics established by the planner. It does not include
115/// operator concerns such as streaming, hooks, tracing, or budget control.
116pub trait BatchExecutor: Send + Sync {
117    /// Execute a full plan using the provided runner for individual tool calls.
118    /// The runner receives (id, name, input_json) and returns an output value.
119    fn exec_batches<F, O, E>(&self, plan: Vec<BatchItem>, f: F) -> Result<Vec<(String, O)>, E>
120    where
121        F: FnMut(String, String, Value) -> Result<O, E> + Send;
122}
123
124/// Baseline sequential executor: executes all tool calls in order, without
125/// introducing any parallelism (Shared batches are executed one-by-one).
126#[derive(Default, Debug, Clone, Copy)]
127pub struct SequentialBatchExecutor;
128
129impl BatchExecutor for SequentialBatchExecutor {
130    fn exec_batches<F, O, E>(&self, plan: Vec<BatchItem>, mut f: F) -> Result<Vec<(String, O)>, E>
131    where
132        F: FnMut(String, String, Value) -> Result<O, E> + Send,
133    {
134        let mut outputs = Vec::new();
135        for item in plan {
136            match item {
137                BatchItem::Exclusive((id, name, input)) => {
138                    let out = f(id.clone(), name, input)?;
139                    outputs.push((id, out));
140                }
141                BatchItem::Shared(batch) => {
142                    for (id, name, input) in batch {
143                        let out = f(id.clone(), name, input)?;
144                        outputs.push((id, out));
145                    }
146                }
147            }
148        }
149        Ok(outputs)
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    struct Decider;
158    impl ConcurrencyDecider for Decider {
159        fn concurrency(&self, tool_name: &str) -> Concurrency {
160            if tool_name.starts_with("s_") {
161                Concurrency::Shared
162            } else {
163                Concurrency::Exclusive
164            }
165        }
166    }
167
168    #[test]
169    fn plans_with_barrier() {
170        let calls = vec![
171            ("1".into(), "s_a".into(), serde_json::json!({})),
172            ("2".into(), "x".into(), serde_json::json!({})),
173            ("3".into(), "s_b".into(), serde_json::json!({})),
174            ("4".into(), "s_c".into(), serde_json::json!({})),
175        ];
176        let plan = BarrierPlanner.plan(&calls, &Decider);
177        assert!(matches!(plan[0], BatchItem::Shared(_)));
178        assert!(matches!(plan[1], BatchItem::Exclusive(_)));
179        assert!(matches!(plan[2], BatchItem::Shared(_)));
180    }
181
182    #[test]
183    fn sequential_executor_executes_in_order() {
184        let calls = vec![
185            ("1".into(), "s_a".into(), serde_json::json!({})),
186            ("2".into(), "x".into(), serde_json::json!({})),
187        ];
188        let plan = BarrierPlanner.plan(&calls, &Decider);
189        let exec = SequentialBatchExecutor;
190        let out = exec
191            .exec_batches(plan, |id, name, _| Ok::<_, ()>((name.clone(), id.clone())))
192            .unwrap();
193        assert_eq!(out.len(), 2);
194        assert_eq!(out[0].0, "1");
195        assert_eq!(out[1].0, "2");
196    }
197
198    struct EmptySteering;
199    impl SteeringSource for EmptySteering {
200        fn drain(&self) -> Vec<neuron_turn::types::ProviderMessage> {
201            Vec::new()
202        }
203    }
204
205    #[test]
206    fn steering_source_compiles() {
207        let s = EmptySteering;
208        let drained = s.drain();
209        assert!(drained.is_empty());
210    }
211}