Skip to main content

knowdit_kg/
agent_runner.rs

1//! Generic primitives for "tool-call agent loops" used across the KG learn
2//! pipeline (categorize / extract semantics / extract findings / merge
3//! semantics / merge findings).
4//!
5//! ## Data flow
6//!
7//! Each LLM step previously asked the model to emit one big JSON document
8//! per chunk. That worked for tiny chunks but broke down when a single
9//! response would have to enumerate dozens of items: the model occasionally
10//! emits structurally invalid JSON (forgets a `,`, doubles a key, miscloses
11//! a bracket), and the entire chunk's worth of work is wasted.
12//!
13//! The replacement: spin up one `llmy::harness::Agent` per chunk (without
14//! memory) and give it exactly two tools:
15//!
16//! - **emit-one-item** — the agent calls this once per record it has
17//!   produced (one DeFi semantic, one finding, one merge decision, …). The
18//!   tool implementation simply pushes the record into an
19//!   [`AgentChunkBuffer`].
20//! - **finalize** — the agent calls this when the chunk is fully covered.
21//!   That terminates the step loop on this side; we then drain the buffer
22//!   and ship the items downstream.
23//!
24//! No JSON parsing of free-form responses; no "max-N-entries-per-response"
25//! tricks; one call per item, so partial progress is never thrown away.
26//!
27//! This module supplies the *plumbing*: the generic per-item buffer, the
28//! step-loop driver, and the run options. Per-workflow tools (with their
29//! workflow-specific argument shapes) live in [`crate::agents`].
30
31use std::sync::Arc;
32
33use llmy::agent::StepResult;
34use llmy::agent::tool::ToolBox;
35use llmy::client::client::LLM;
36use llmy::client::settings::LLMSettings;
37use llmy::harness::Agent;
38use thiserror::Error;
39use tokio::sync::Mutex;
40
41use crate::error::{KgError, Result};
42
43/// Tunables shared by every agent-style chunk run.
44///
45/// Surfaced 1:1 from CLI by the caller; this struct does NOT read any
46/// `std::env` variable. The `Default` impl exists only as a single
47/// well-known starting point for code paths that haven't been updated to
48/// thread the CLI knob through (currently: the deprecated `knowdit-spec`
49/// crate). Production callers should always override via
50/// [`AgentRunOptions::new`].
51#[derive(Debug, Clone)]
52pub struct AgentRunOptions {
53    /// Hard cap on the number of agent steps before we force-abandon the
54    /// chunk. The initial `step_with_user` counts as step 1. Values < 1
55    /// are clamped to 1.
56    pub max_agent_steps: usize,
57    /// Optional debug prefix forwarded to llmy. When set, llmy persists
58    /// every request/response pair under
59    /// `$LLM_DEBUG/<pid>/<prefix>-<NNN>.{json,xml}`.
60    pub debug_prefix: Option<String>,
61    /// Optional per-call llmy settings. When `None`, llmy reads the
62    /// process-level defaults (which the parent CLI populated from clap).
63    pub llm_settings: Option<LLMSettings>,
64}
65
66impl Default for AgentRunOptions {
67    fn default() -> Self {
68        // Single source of truth for the fallback step cap. Keep in sync
69        // with the clap default in `MergeCliArgs::max_agent_steps` /
70        // `ExtractSemanticsArgs::max_agent_steps` (both default to 60).
71        Self::new(60)
72    }
73}
74
75impl AgentRunOptions {
76    pub fn new(max_agent_steps: usize) -> Self {
77        Self {
78            max_agent_steps: max_agent_steps.max(1),
79            debug_prefix: None,
80            llm_settings: None,
81        }
82    }
83
84    pub fn with_debug_prefix(mut self, prefix: impl Into<String>) -> Self {
85        self.debug_prefix = Some(prefix.into());
86        self
87    }
88
89    pub fn with_llm_settings(mut self, settings: LLMSettings) -> Self {
90        self.llm_settings = Some(settings);
91        self
92    }
93
94    /// Build a child `AgentRunOptions` with `debug_prefix` extended by
95    /// `scope`. Clones the rest of the fields so the original options
96    /// stays usable for sibling phases.
97    pub fn scoped(&self, scope: &str) -> Self {
98        let prefix = match self.debug_prefix.as_deref() {
99            Some(p) if !p.is_empty() => format!("{p}-{scope}"),
100            _ => scope.to_string(),
101        };
102        Self {
103            max_agent_steps: self.max_agent_steps,
104            debug_prefix: Some(prefix),
105            llm_settings: self.llm_settings.clone(),
106        }
107    }
108}
109
110/// Outcome reported by [`AgentChunkRunner::run`] (or the workflow runners).
111#[derive(Debug, Clone)]
112pub struct AgentRunOutcome {
113    /// Number of agent steps consumed (>= 1 because the initial
114    /// `step_with_user` counts).
115    pub steps: usize,
116    /// `Some(reason)` if the loop ended without the agent invoking the
117    /// finalize tool — usually "agent stopped silently" or "exceeded
118    /// max_agent_steps". Callers may downgrade this to a warning since the
119    /// items already in the buffer remain valid.
120    pub early_stop_reason: Option<String>,
121    /// Free-form summary string the agent passed to the finalize tool, if
122    /// the tool was actually called.
123    pub finalize_summary: Option<String>,
124}
125
126/// Generic per-chunk accumulator. Tool implementations push items in via
127/// [`AgentChunkBuffer::push`]; the matching finalize tool calls
128/// [`AgentChunkBuffer::finalize`] to break the agent step loop. The driver
129/// drains items via [`AgentChunkBuffer::drain`] once the loop exits.
130///
131/// `T` is the workflow's record type (e.g. `ExtractedSemantic`,
132/// `ExtractedFinding`, `MergeDecisionRecord`, …).
133#[derive(Debug)]
134pub struct AgentChunkBuffer<T> {
135    items: Mutex<Vec<T>>,
136    state: Mutex<BufferState>,
137}
138
139#[derive(Debug, Clone, Default)]
140struct BufferState {
141    finalized: bool,
142    finalize_summary: Option<String>,
143}
144
145#[derive(Debug, Error)]
146pub enum AgentBufferError {
147    #[error("buffer is already finalized; subsequent tool calls are ignored")]
148    AlreadyFinalized,
149}
150
151impl<T> Default for AgentChunkBuffer<T> {
152    fn default() -> Self {
153        Self {
154            items: Mutex::new(Vec::new()),
155            state: Mutex::new(BufferState::default()),
156        }
157    }
158}
159
160impl<T> AgentChunkBuffer<T> {
161    pub fn new() -> Arc<Self> {
162        Arc::new(Self::default())
163    }
164
165    /// Append a record. Returns an error (visible to the agent as a tool
166    /// result string) if the buffer has already been finalized.
167    pub async fn push(&self, item: T) -> std::result::Result<(), AgentBufferError> {
168        if self.state.lock().await.finalized {
169            return Err(AgentBufferError::AlreadyFinalized);
170        }
171        self.items.lock().await.push(item);
172        Ok(())
173    }
174
175    /// Push and synthesize the tool-result string in one shot. `label`
176    /// names the record kind (e.g. `"semantic"`) for the success message.
177    /// On failure (already finalized) returns the error message intended
178    /// for the LLM.
179    pub async fn push_with_message(&self, item: T, label: &str) -> String {
180        match self.push(item).await {
181            Ok(()) => format!("ok: recorded {label} #{}", self.len().await),
182            Err(err) => format!("error: {err}"),
183        }
184    }
185
186    /// Mark the buffer finalized; subsequent `push` and `finalize` calls
187    /// will fail. The agent's step loop polls [`Self::is_finalized`] to
188    /// decide when to exit.
189    pub async fn finalize(
190        &self,
191        summary: Option<String>,
192    ) -> std::result::Result<(), AgentBufferError> {
193        let mut state = self.state.lock().await;
194        if state.finalized {
195            return Err(AgentBufferError::AlreadyFinalized);
196        }
197        state.finalized = true;
198        state.finalize_summary = summary;
199        Ok(())
200    }
201
202    /// Finalize and synthesize the tool-result string in one shot. `label`
203    /// names the workflow (e.g. `"semantic extraction"`) for the success
204    /// message.
205    pub async fn finalize_with_message(&self, summary: Option<String>, label: &str) -> String {
206        match self.finalize(summary).await {
207            Ok(()) => format!(
208                "ok: {label} finalized with {} item(s). Stop now.",
209                self.len().await
210            ),
211            Err(err) => format!("error: {err}"),
212        }
213    }
214
215    pub async fn is_finalized(&self) -> bool {
216        self.state.lock().await.finalized
217    }
218
219    /// Snapshot the count without taking the items.
220    pub async fn len(&self) -> usize {
221        self.items.lock().await.len()
222    }
223
224    /// Move all collected items out of the buffer. The buffer is left
225    /// empty (but still in finalized-state); intended to be called once,
226    /// after the step loop ends.
227    pub async fn drain(&self) -> Vec<T> {
228        std::mem::take(&mut *self.items.lock().await)
229    }
230
231    /// Return the finalize summary the agent reported (if the agent
232    /// actually called the finalize tool). Cloned so the buffer can stay
233    /// borrowed.
234    pub async fn finalize_summary(&self) -> Option<String> {
235        self.state.lock().await.finalize_summary.clone()
236    }
237}
238
239/// Step-loop driver. Caller assembles the [`ToolBox`] (which must include
240/// at least one finalize-capable tool that flips the buffer's `finalized`
241/// flag) and the system / user prompts; this method spins up an agent
242/// without memory and pumps it until the buffer is finalized, the agent
243/// stops on its own, or `max_agent_steps` is exhausted.
244///
245/// `LLM` is `Clone`, `AgentRunOptions` is `Clone`, and prompts/keys are
246/// already owned `String`s — no lifetimes needed.
247pub struct AgentChunkRunner<T> {
248    pub llm: LLM,
249    pub options: AgentRunOptions,
250    pub buffer: Arc<AgentChunkBuffer<T>>,
251    pub tools: ToolBox,
252    pub system_prompt: String,
253    pub user_prompt: String,
254    pub cache_key: String,
255    pub label: String,
256}
257
258impl<T> AgentChunkRunner<T> {
259    pub async fn run(self) -> Result<AgentRunOutcome> {
260        let AgentChunkRunner {
261            llm,
262            options,
263            buffer,
264            tools,
265            system_prompt,
266            user_prompt,
267            cache_key,
268            label,
269        } = self;
270
271        let mut agent = Agent::new(system_prompt, tools, cache_key);
272
273        let initial = agent
274            .step_with_user(
275                user_prompt,
276                &llm,
277                options.debug_prefix.as_deref(),
278                options.llm_settings.clone(),
279            )
280            .await
281            .map_err(|err| KgError::other(format!("{label} agent initial step failed: {err}")))?;
282
283        let mut step = initial;
284        let mut steps: usize = 1;
285        let mut early_stop: Option<String> = None;
286
287        while !buffer.is_finalized().await {
288            if matches!(step, StepResult::Stop(_)) {
289                early_stop = Some("agent stopped without calling finalize".to_string());
290                break;
291            }
292            if steps >= options.max_agent_steps {
293                early_stop = Some(format!(
294                    "agent exceeded max_agent_steps={}",
295                    options.max_agent_steps
296                ));
297                break;
298            }
299            step = agent
300                .step(
301                    &llm,
302                    options.debug_prefix.as_deref(),
303                    options.llm_settings.clone(),
304                )
305                .await
306                .map_err(|err| {
307                    KgError::other(format!("{label} agent step #{steps} failed: {err}"))
308                })?;
309            steps += 1;
310        }
311
312        let summary = buffer.finalize_summary().await;
313        if let Some(reason) = early_stop.as_ref() {
314            tracing::warn!(
315                "{} agent: {} (collected {} item(s))",
316                label,
317                reason,
318                buffer.len().await,
319            );
320        }
321        Ok(AgentRunOutcome {
322            steps,
323            early_stop_reason: early_stop,
324            finalize_summary: summary,
325        })
326    }
327}