neuron_turn_kit/lib.rs
1//! neuron-turn-kit
2//!
3//! Planning and execution primitives for a single agent turn — focused on
4//! sequencing and concurrency only. These traits are intentionally narrow and
5//! do not bake in provider streaming, hooks, or operator concerns; those live
6//! at higher layers (e.g., operator implementations).
7//!
8//! Contents:
9//! - Concurrency and ConcurrencyDecider — classify tool calls as Shared vs Exclusive
10//! - ToolExecutionPlanner and BarrierPlanner — sequence tool calls into batches
11//! - SteeringSource — optional source of mid-loop steering messages
12//! - BatchExecutor — run batches, with a simple sequential baseline executor
13//!
14//! Example: planning with a barrier.
15//! ```rust
16//! use neuron_turn_kit::{BarrierPlanner, Concurrency, ConcurrencyDecider, ToolExecutionPlanner};
17//!
18//! struct SharedIfStartsWith;
19//! impl ConcurrencyDecider for SharedIfStartsWith {
20//! fn concurrency(&self, tool_name: &str) -> Concurrency {
21//! if tool_name.starts_with("shared_") { Concurrency::Shared } else { Concurrency::Exclusive }
22//! }
23//! }
24//!
25//! let calls = vec![
26//! ("1".to_string(), "shared_a".to_string(), serde_json::json!({})),
27//! ("2".to_string(), "exclusive".to_string(), serde_json::json!({})),
28//! ("3".to_string(), "shared_b".to_string(), serde_json::json!({})),
29//! ];
30//! let planner = BarrierPlanner;
31//! let plan = planner.plan(&calls, &SharedIfStartsWith);
32//! assert!(matches!(plan[0], neuron_turn_kit::BatchItem::Shared(_)));
33//! ```
34
35use serde_json::Value;
36
37/// Concurrency hint for tool scheduling (strategy-defined).
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
39pub enum Concurrency {
40 /// Safe to run alongside other shared tools in the same batch.
41 Shared,
42 /// Must run alone (barrier before and after).
43 Exclusive,
44}
45
46/// Decide concurrency for a tool by name.
47pub trait ConcurrencyDecider: Send + Sync {
48 /// Return the concurrency class for a tool by name.
49 fn concurrency(&self, tool_name: &str) -> Concurrency;
50}
51
52/// Optional source of steering messages to inject mid-loop (provider-formatted).
53///
54/// Boundary: this is not a "hook" — it does not inspect internal state or
55/// events. It is a narrow bridge for external steering signals only.
56pub trait SteeringSource: Send + Sync {
57 /// Drain any available provider-formatted messages to inject.
58 fn drain(&self) -> Vec<neuron_turn::types::ProviderMessage>;
59}
60
61/// Planned batches for a turn.
62#[derive(Debug, Clone)]
63pub enum BatchItem {
64 /// Batch of tools that may execute in parallel (shared).
65 /// Each entry is (id, name, input_json).
66 Shared(Vec<(String, String, Value)>),
67 /// A single tool that must execute alone (exclusive).
68 Exclusive((String, String, Value)),
69}
70
71/// Plan how to execute tool calls this turn (sequencing only).
72pub trait ToolExecutionPlanner: Send + Sync {
73 /// Plan execution batches from an ordered list of tool calls. The planner
74 /// must preserve relative order of application and introduce parallelism
75 /// only for Shared batches. The decider classifies each tool.
76 fn plan(
77 &self,
78 tool_uses: &[(String, String, Value)],
79 decider: &dyn ConcurrencyDecider,
80 ) -> Vec<BatchItem>;
81}
82
83/// Barrier planner: batches shared tools; flushes on exclusive.
84pub struct BarrierPlanner;
85impl ToolExecutionPlanner for BarrierPlanner {
86 fn plan(
87 &self,
88 tool_uses: &[(String, String, Value)],
89 decider: &dyn ConcurrencyDecider,
90 ) -> Vec<BatchItem> {
91 let mut out = Vec::new();
92 let mut pending_shared: Vec<(String, String, Value)> = Vec::new();
93 for (id, name, input) in tool_uses.iter().cloned() {
94 match decider.concurrency(&name) {
95 Concurrency::Shared => pending_shared.push((id, name, input)),
96 Concurrency::Exclusive => {
97 if !pending_shared.is_empty() {
98 out.push(BatchItem::Shared(std::mem::take(&mut pending_shared)));
99 }
100 out.push(BatchItem::Exclusive((id, name, input)));
101 }
102 }
103 }
104 if !pending_shared.is_empty() {
105 out.push(BatchItem::Shared(pending_shared));
106 }
107 out
108 }
109}
110
111/// Contract for executing planned batches.
112///
113/// Narrow by design: this concerns only execution of tool invocations with
114/// concurrency semantics established by the planner. It does not include
115/// operator concerns such as streaming, hooks, tracing, or budget control.
116pub trait BatchExecutor: Send + Sync {
117 /// Execute a full plan using the provided runner for individual tool calls.
118 /// The runner receives (id, name, input_json) and returns an output value.
119 fn exec_batches<F, O, E>(&self, plan: Vec<BatchItem>, f: F) -> Result<Vec<(String, O)>, E>
120 where
121 F: FnMut(String, String, Value) -> Result<O, E> + Send;
122}
123
124/// Baseline sequential executor: executes all tool calls in order, without
125/// introducing any parallelism (Shared batches are executed one-by-one).
126#[derive(Default, Debug, Clone, Copy)]
127pub struct SequentialBatchExecutor;
128
129impl BatchExecutor for SequentialBatchExecutor {
130 fn exec_batches<F, O, E>(&self, plan: Vec<BatchItem>, mut f: F) -> Result<Vec<(String, O)>, E>
131 where
132 F: FnMut(String, String, Value) -> Result<O, E> + Send,
133 {
134 let mut outputs = Vec::new();
135 for item in plan {
136 match item {
137 BatchItem::Exclusive((id, name, input)) => {
138 let out = f(id.clone(), name, input)?;
139 outputs.push((id, out));
140 }
141 BatchItem::Shared(batch) => {
142 for (id, name, input) in batch {
143 let out = f(id.clone(), name, input)?;
144 outputs.push((id, out));
145 }
146 }
147 }
148 }
149 Ok(outputs)
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 struct Decider;
158 impl ConcurrencyDecider for Decider {
159 fn concurrency(&self, tool_name: &str) -> Concurrency {
160 if tool_name.starts_with("s_") {
161 Concurrency::Shared
162 } else {
163 Concurrency::Exclusive
164 }
165 }
166 }
167
168 #[test]
169 fn plans_with_barrier() {
170 let calls = vec![
171 ("1".into(), "s_a".into(), serde_json::json!({})),
172 ("2".into(), "x".into(), serde_json::json!({})),
173 ("3".into(), "s_b".into(), serde_json::json!({})),
174 ("4".into(), "s_c".into(), serde_json::json!({})),
175 ];
176 let plan = BarrierPlanner.plan(&calls, &Decider);
177 assert!(matches!(plan[0], BatchItem::Shared(_)));
178 assert!(matches!(plan[1], BatchItem::Exclusive(_)));
179 assert!(matches!(plan[2], BatchItem::Shared(_)));
180 }
181
182 #[test]
183 fn sequential_executor_executes_in_order() {
184 let calls = vec![
185 ("1".into(), "s_a".into(), serde_json::json!({})),
186 ("2".into(), "x".into(), serde_json::json!({})),
187 ];
188 let plan = BarrierPlanner.plan(&calls, &Decider);
189 let exec = SequentialBatchExecutor;
190 let out = exec
191 .exec_batches(plan, |id, name, _| Ok::<_, ()>((name.clone(), id.clone())))
192 .unwrap();
193 assert_eq!(out.len(), 2);
194 assert_eq!(out[0].0, "1");
195 assert_eq!(out[1].0, "2");
196 }
197
198 struct EmptySteering;
199 impl SteeringSource for EmptySteering {
200 fn drain(&self) -> Vec<neuron_turn::types::ProviderMessage> {
201 Vec::new()
202 }
203 }
204
205 #[test]
206 fn steering_source_compiles() {
207 let s = EmptySteering;
208 let drained = s.drain();
209 assert!(drained.is_empty());
210 }
211}