klieo-core 0.6.0

Core traits + runtime for the klieo agent framework.
Documentation
//! Runtime loop driving an [`crate::agent::Agent`] through LLM ↔ tool
//! exchanges.
//!
//! This is the narrow primitive every higher layer (graph, actor,
//! spec-aggregate) builds on. It does one thing: alternate LLM
//! completions with tool dispatch, persisting episodic events, until
//! the LLM emits a final text reply, hits `max_steps`, or is cancelled.
//!
//! Module map:
//! - `dispatch` — shared tool-dispatch helper (deduped between drivers).
//! - `guardrails` — `check_pre_llm` / `check_post_llm` dispatch.
//! - `options` — [`RunOptions`] tunables + defaults.
//! - `retry` — bounded exponential-backoff retry policy + constants.
//! - `streaming` — [`run_steps_streaming`] and its loop driver.
//! - `streaming_forward` — chunk-pump helper for the streaming loop.
//! - `mod.rs` (this file) — [`run_steps`] + shared `build_request`.

mod dispatch;
mod guardrails;
mod options;
mod retry;
mod step;
mod streaming;
mod streaming_forward;

use crate::agent::AgentContext;
use crate::error::Error;
use crate::ids::ThreadId;
use crate::llm::{ChatRequest, Message, Role};
use crate::memory::Episode;
use std::time::Instant;
use tracing::{error, instrument};

use guardrails::{check_post_llm, check_pre_llm};
use retry::complete_with_retry;
use step::{record_and_dispatch, StepDisposition, StepOutcome};

pub use options::RunOptions;
pub use streaming::run_steps_streaming;

/// Drive the agent's LLM/tool loop until completion. Caller is
/// responsible for appending the user message before invoking — this
/// function only consumes / extends short-term memory.
///
/// Returns the assistant's final text response.
///
/// **Episode logging.** Records `Episode::Started` on entry, `LlmCall`
/// per cycle, `ToolCall` per dispatched tool, and `Completed` on
/// success. **Does NOT record `Episode::Failed` on errors** — failures
/// propagate via the `Result::Err` return and the caller decides how
/// to log them. This differs from [`run_steps_streaming`], which
/// always records a terminal `Failed` episode because errors cannot
/// flow back through a returned stream.
#[instrument(level = "debug", skip(ctx, system_prompt), fields(run_id = %ctx.run_id))]
pub async fn run_steps(
    ctx: &AgentContext,
    system_prompt: &str,
    thread: ThreadId,
    opts: RunOptions,
) -> Result<String, Error> {
    ctx.episodic
        .record(
            ctx.run_id,
            Episode::Started {
                agent: ctx.agent_name.clone(),
            },
        )
        .await?;

    let mut step = 0u32;
    loop {
        if ctx.cancel.is_cancelled() {
            error!("cancelled");
            return Err(Error::Cancelled);
        }
        if step >= opts.max_steps {
            return Err(Error::MaxStepsExceeded {
                steps: opts.max_steps,
            });
        }
        step += 1;

        let req = build_request(ctx, system_prompt, &thread, opts.max_history_tokens).await?;

        check_pre_llm(&opts.guardrails, &req).await?;

        let started = Instant::now();
        let resp = complete_with_retry(ctx.llm.as_ref(), &ctx.cancel, req.clone()).await?;
        let latency_ms = started.elapsed().as_millis() as u32;

        check_post_llm(&opts.guardrails, &req, &resp).await?;

        let outcome = StepOutcome {
            message: resp.message,
            finish_reason: resp.finish_reason,
            usage: resp.usage,
            latency_ms,
        };
        match record_and_dispatch(ctx, &thread, step, outcome, "blocking").await? {
            StepDisposition::Done(content) => return Ok(content),
            StepDisposition::Continue => continue,
        }
    }
}

/// Build a single [`ChatRequest`] from short-term memory + the system
/// prompt + the active tool catalogue. Shared by [`run_steps`] and
/// [`run_steps_streaming`] so prompt-assembly stays identical between
/// paths.
pub(super) async fn build_request(
    ctx: &AgentContext,
    system_prompt: &str,
    thread: &ThreadId,
    max_history_tokens: usize,
) -> Result<ChatRequest, Error> {
    let history = ctx
        .short_term
        .load(thread.clone(), max_history_tokens)
        .await?;
    let mut messages = Vec::with_capacity(history.len() + 1);
    if !system_prompt.is_empty() {
        messages.push(Message {
            role: Role::System,
            content: system_prompt.into(),
            tool_calls: vec![],
            tool_call_id: None,
        });
    }
    messages.extend(history);

    Ok(ChatRequest {
        messages,
        tools: ctx.tools.catalogue(),
        ..ChatRequest::new(vec![])
    })
}