knowdit_kg/agent_runner.rs
1//! Generic primitives for "tool-call agent loops" used across the KG learn
2//! pipeline (categorize / extract semantics / extract findings / merge
3//! semantics / merge findings).
4//!
5//! ## Data flow
6//!
7//! Each LLM step previously asked the model to emit one big JSON document
8//! per chunk. That worked for tiny chunks but broke down when a single
9//! response would have to enumerate dozens of items: the model occasionally
10//! emits structurally invalid JSON (forgets a `,`, doubles a key, miscloses
11//! a bracket), and the entire chunk's worth of work is wasted.
12//!
13//! The replacement: spin up one `llmy::harness::Agent` per chunk (without
14//! memory) and give it exactly two tools:
15//!
16//! - **emit-one-item** — the agent calls this once per record it has
17//! produced (one DeFi semantic, one finding, one merge decision, …). The
18//! tool implementation simply pushes the record into an
19//! [`AgentChunkBuffer`].
20//! - **finalize** — the agent calls this when the chunk is fully covered.
21//! That terminates the step loop on this side; we then drain the buffer
22//! and ship the items downstream.
23//!
24//! No JSON parsing of free-form responses; no "max-N-entries-per-response"
25//! tricks; one call per item, so partial progress is never thrown away.
26//!
27//! This module supplies the *plumbing*: the generic per-item buffer, the
28//! step-loop driver, and the run options. Per-workflow tools (with their
29//! workflow-specific argument shapes) live in [`crate::agents`].
30
31use std::sync::Arc;
32
33use llmy::agent::StepResult;
34use llmy::agent::tool::ToolBox;
35use llmy::client::client::LLM;
36use llmy::client::settings::LLMSettings;
37use llmy::harness::Agent;
38use thiserror::Error;
39use tokio::sync::Mutex;
40
41use crate::error::{KgError, Result};
42
43/// Tunables shared by every agent-style chunk run.
44///
45/// Surfaced 1:1 from CLI by the caller; this struct does NOT read any
46/// `std::env` variable. The `Default` impl exists only as a single
47/// well-known starting point for code paths that haven't been updated to
48/// thread the CLI knob through (currently: the deprecated `knowdit-spec`
49/// crate). Production callers should always override via
50/// [`AgentRunOptions::new`].
51#[derive(Debug, Clone)]
52pub struct AgentRunOptions {
53 /// Hard cap on the number of agent steps before we force-abandon the
54 /// chunk. The initial `step_with_user` counts as step 1. Values < 1
55 /// are clamped to 1.
56 pub max_agent_steps: usize,
57 /// Optional debug prefix forwarded to llmy. When set, llmy persists
58 /// every request/response pair under
59 /// `$LLM_DEBUG/<pid>/<prefix>-<NNN>.{json,xml}`.
60 pub debug_prefix: Option<String>,
61 /// Optional per-call llmy settings. When `None`, llmy reads the
62 /// process-level defaults (which the parent CLI populated from clap).
63 pub llm_settings: Option<LLMSettings>,
64}
65
66impl Default for AgentRunOptions {
67 fn default() -> Self {
68 // Single source of truth for the fallback step cap. Keep in sync
69 // with the clap default in `MergeCliArgs::max_agent_steps` /
70 // `ExtractSemanticsArgs::max_agent_steps` (both default to 60).
71 Self::new(60)
72 }
73}
74
75impl AgentRunOptions {
76 pub fn new(max_agent_steps: usize) -> Self {
77 Self {
78 max_agent_steps: max_agent_steps.max(1),
79 debug_prefix: None,
80 llm_settings: None,
81 }
82 }
83
84 pub fn with_debug_prefix(mut self, prefix: impl Into<String>) -> Self {
85 self.debug_prefix = Some(prefix.into());
86 self
87 }
88
89 pub fn with_llm_settings(mut self, settings: LLMSettings) -> Self {
90 self.llm_settings = Some(settings);
91 self
92 }
93
94 /// Build a child `AgentRunOptions` with `debug_prefix` extended by
95 /// `scope`. Clones the rest of the fields so the original options
96 /// stays usable for sibling phases.
97 pub fn scoped(&self, scope: &str) -> Self {
98 let prefix = match self.debug_prefix.as_deref() {
99 Some(p) if !p.is_empty() => format!("{p}-{scope}"),
100 _ => scope.to_string(),
101 };
102 Self {
103 max_agent_steps: self.max_agent_steps,
104 debug_prefix: Some(prefix),
105 llm_settings: self.llm_settings.clone(),
106 }
107 }
108}
109
110/// Outcome reported by [`AgentChunkRunner::run`] (or the workflow runners).
111#[derive(Debug, Clone)]
112pub struct AgentRunOutcome {
113 /// Number of agent steps consumed (>= 1 because the initial
114 /// `step_with_user` counts).
115 pub steps: usize,
116 /// `Some(reason)` if the loop ended without the agent invoking the
117 /// finalize tool — usually "agent stopped silently" or "exceeded
118 /// max_agent_steps". Callers may downgrade this to a warning since the
119 /// items already in the buffer remain valid.
120 pub early_stop_reason: Option<String>,
121 /// Free-form summary string the agent passed to the finalize tool, if
122 /// the tool was actually called.
123 pub finalize_summary: Option<String>,
124}
125
126/// Generic per-chunk accumulator. Tool implementations push items in via
127/// [`AgentChunkBuffer::push`]; the matching finalize tool calls
128/// [`AgentChunkBuffer::finalize`] to break the agent step loop. The driver
129/// drains items via [`AgentChunkBuffer::drain`] once the loop exits.
130///
131/// `T` is the workflow's record type (e.g. `ExtractedSemantic`,
132/// `ExtractedFinding`, `MergeDecisionRecord`, …).
133#[derive(Debug)]
134pub struct AgentChunkBuffer<T> {
135 items: Mutex<Vec<T>>,
136 state: Mutex<BufferState>,
137}
138
139#[derive(Debug, Clone, Default)]
140struct BufferState {
141 finalized: bool,
142 finalize_summary: Option<String>,
143}
144
145#[derive(Debug, Error)]
146pub enum AgentBufferError {
147 #[error("buffer is already finalized; subsequent tool calls are ignored")]
148 AlreadyFinalized,
149}
150
151impl<T> Default for AgentChunkBuffer<T> {
152 fn default() -> Self {
153 Self {
154 items: Mutex::new(Vec::new()),
155 state: Mutex::new(BufferState::default()),
156 }
157 }
158}
159
160impl<T> AgentChunkBuffer<T> {
161 pub fn new() -> Arc<Self> {
162 Arc::new(Self::default())
163 }
164
165 /// Append a record. Returns an error (visible to the agent as a tool
166 /// result string) if the buffer has already been finalized.
167 pub async fn push(&self, item: T) -> std::result::Result<(), AgentBufferError> {
168 if self.state.lock().await.finalized {
169 return Err(AgentBufferError::AlreadyFinalized);
170 }
171 self.items.lock().await.push(item);
172 Ok(())
173 }
174
175 /// Push and synthesize the tool-result string in one shot. `label`
176 /// names the record kind (e.g. `"semantic"`) for the success message.
177 /// On failure (already finalized) returns the error message intended
178 /// for the LLM.
179 pub async fn push_with_message(&self, item: T, label: &str) -> String {
180 match self.push(item).await {
181 Ok(()) => format!("ok: recorded {label} #{}", self.len().await),
182 Err(err) => format!("error: {err}"),
183 }
184 }
185
186 /// Mark the buffer finalized; subsequent `push` and `finalize` calls
187 /// will fail. The agent's step loop polls [`Self::is_finalized`] to
188 /// decide when to exit.
189 pub async fn finalize(
190 &self,
191 summary: Option<String>,
192 ) -> std::result::Result<(), AgentBufferError> {
193 let mut state = self.state.lock().await;
194 if state.finalized {
195 return Err(AgentBufferError::AlreadyFinalized);
196 }
197 state.finalized = true;
198 state.finalize_summary = summary;
199 Ok(())
200 }
201
202 /// Finalize and synthesize the tool-result string in one shot. `label`
203 /// names the workflow (e.g. `"semantic extraction"`) for the success
204 /// message.
205 pub async fn finalize_with_message(&self, summary: Option<String>, label: &str) -> String {
206 match self.finalize(summary).await {
207 Ok(()) => format!(
208 "ok: {label} finalized with {} item(s). Stop now.",
209 self.len().await
210 ),
211 Err(err) => format!("error: {err}"),
212 }
213 }
214
215 pub async fn is_finalized(&self) -> bool {
216 self.state.lock().await.finalized
217 }
218
219 /// Snapshot the count without taking the items.
220 pub async fn len(&self) -> usize {
221 self.items.lock().await.len()
222 }
223
224 /// Move all collected items out of the buffer. The buffer is left
225 /// empty (but still in finalized-state); intended to be called once,
226 /// after the step loop ends.
227 pub async fn drain(&self) -> Vec<T> {
228 std::mem::take(&mut *self.items.lock().await)
229 }
230
231 /// Return the finalize summary the agent reported (if the agent
232 /// actually called the finalize tool). Cloned so the buffer can stay
233 /// borrowed.
234 pub async fn finalize_summary(&self) -> Option<String> {
235 self.state.lock().await.finalize_summary.clone()
236 }
237}
238
239/// Step-loop driver. Caller assembles the [`ToolBox`] (which must include
240/// at least one finalize-capable tool that flips the buffer's `finalized`
241/// flag) and the system / user prompts; this method spins up an agent
242/// without memory and pumps it until the buffer is finalized, the agent
243/// stops on its own, or `max_agent_steps` is exhausted.
244///
245/// `LLM` is `Clone`, `AgentRunOptions` is `Clone`, and prompts/keys are
246/// already owned `String`s — no lifetimes needed.
247pub struct AgentChunkRunner<T> {
248 pub llm: LLM,
249 pub options: AgentRunOptions,
250 pub buffer: Arc<AgentChunkBuffer<T>>,
251 pub tools: ToolBox,
252 pub system_prompt: String,
253 pub user_prompt: String,
254 pub cache_key: String,
255 pub label: String,
256}
257
258impl<T> AgentChunkRunner<T> {
259 pub async fn run(self) -> Result<AgentRunOutcome> {
260 let AgentChunkRunner {
261 llm,
262 options,
263 buffer,
264 tools,
265 system_prompt,
266 user_prompt,
267 cache_key,
268 label,
269 } = self;
270
271 let mut agent = Agent::new(system_prompt, tools, cache_key);
272
273 let initial = agent
274 .step_with_user(
275 user_prompt,
276 &llm,
277 options.debug_prefix.as_deref(),
278 options.llm_settings.clone(),
279 )
280 .await
281 .map_err(|err| KgError::other(format!("{label} agent initial step failed: {err}")))?;
282
283 let mut step = initial;
284 let mut steps: usize = 1;
285 let mut early_stop: Option<String> = None;
286
287 while !buffer.is_finalized().await {
288 if matches!(step, StepResult::Stop(_)) {
289 early_stop = Some("agent stopped without calling finalize".to_string());
290 break;
291 }
292 if steps >= options.max_agent_steps {
293 early_stop = Some(format!(
294 "agent exceeded max_agent_steps={}",
295 options.max_agent_steps
296 ));
297 break;
298 }
299 step = agent
300 .step(
301 &llm,
302 options.debug_prefix.as_deref(),
303 options.llm_settings.clone(),
304 )
305 .await
306 .map_err(|err| {
307 KgError::other(format!("{label} agent step #{steps} failed: {err}"))
308 })?;
309 steps += 1;
310 }
311
312 let summary = buffer.finalize_summary().await;
313 if let Some(reason) = early_stop.as_ref() {
314 tracing::warn!(
315 "{} agent: {} (collected {} item(s))",
316 label,
317 reason,
318 buffer.len().await,
319 );
320 }
321 Ok(AgentRunOutcome {
322 steps,
323 early_stop_reason: early_stop,
324 finalize_summary: summary,
325 })
326 }
327}