motosan-agent-loop 0.7.1

Standalone ReAct agent loop — LlmClient + AgentLoop with no platform dependencies
Documentation

CI crates.io docs.rs

motosan-agent-loop

A standalone ReAct agent loop for Rust. It drives any LLM through iterative reasoning and tool execution, producing a final answer — with zero platform dependencies. Bring your own LLM backend (OpenAI, Anthropic, local models, or a test double) by implementing the LlmClient trait, register tools via motosan-agent-tool, and let AgentLoop handle the rest.

Features

Feature Default Description
motosan-ai off LlmClient bridge for motosan-ai SDK
mcp-client off MCP (Model Context Protocol) server support via rmcp
cancellation off CancellationToken support for graceful loop cancellation

Quick Start

[dependencies]
motosan-agent-loop = "0.7"
motosan-agent-tool = "0.2"
async-trait = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
use async_trait::async_trait;
use motosan_agent_loop::{
    AgentLoop, AgentEvent, ChatOutput, LlmClient, LlmResponse, Message, Result,
};
use motosan_agent_tool::ToolDef;

struct MyLlm;

#[async_trait]
impl LlmClient for MyLlm {
    async fn chat(&self, _messages: &[Message], _tools: &[ToolDef]) -> Result<ChatOutput> {
        Ok(ChatOutput::new(LlmResponse::Message("Hello!".into())))
    }
}

#[tokio::main]
async fn main() {
    let agent = AgentLoop::builder()
        // .tool(my_tool)
        // .context(my_provider)
        .max_iterations(10)
        .build();

    let result = agent
        .run(&MyLlm, vec![Message::user("Hi!")], |event| match &event {
            AgentEvent::ToolStarted { name } => println!("tool: {name}"),
            AgentEvent::ToolCompleted { name, .. } => println!("done: {name}"),
            AgentEvent::TextChunk(t) => print!("{t}"),
            _ => {}
        })
        .await
        .unwrap();

    println!("Answer: {}", result.answer);
    println!("Tokens: {}in/{}out", result.usage.input_tokens, result.usage.output_tokens);
}

Builder API

let agent = AgentLoop::builder()
    .tool(my_tool)                          // register a tool
    .tools(vec![tool_a, tool_b])            // register multiple tools
    .system_prompt("You are helpful.")      // shortcut for a ContextProvider
    .context(my_context_provider)           // dynamic context injection
    .max_iterations(10)                     // default: 10
    .tool_timeout(Duration::from_secs(30))  // per-tool call timeout
    .build();

Streaming

let result = agent
    .run_streaming(&llm, messages, |event| match &event {
        AgentEvent::TextChunk(delta) => print!("{delta}"),
        AgentEvent::TextDone(full) => println!("\n---\n{full}"),
        _ => {}
    })
    .await?;

Override LlmClient::chat_stream() for true streaming; the default falls back to chat().

Interactive Control (run_with_ops)

Use run_with_ops() when you need to interrupt or inject input while a turn is running.

use tokio::sync::mpsc;
use motosan_agent_loop::{AgentOp, AgentEvent};

let (ops_tx, ops_rx) = mpsc::channel(32);
let run = agent.run_with_ops(&llm, messages, Some(ops_rx), |event| {
    if let AgentEvent::AskUser { call_id, question, .. } = event {
        println!("Agent asks: {question}");
        // send answer back from your app/UI thread
        let tx = ops_tx.clone();
        tokio::spawn(async move {
            let _ = tx.send(AgentOp::AskUserAnswer {
                call_id: Some(call_id),
                answer: "Option A".into(),
            }).await;
        });
    }
});

// From another task:
let _ = ops_tx.send(AgentOp::InjectHint("Focus on shorter output".into())).await;
let _ = ops_tx.send(AgentOp::Interrupt).await;
let result = run.await?;

AgentSession (Long-Lived Background Session)

use std::sync::Arc;
use motosan_agent_loop::{AgentSession, MemorySessionStore};

let (session, handle) = AgentSession::new(agent, Arc::new(llm), |_| {});
session.send("Search for Taipei rentals under 30000").await;
session.inject("Prefer listings with elevators").await;
session.interrupt().await;

// Persistent session
let store = Arc::new(MemorySessionStore::new());
let (persistent, _handle2) =
    AgentSession::new_with_store("chat-1", store.clone(), agent2, Arc::new(llm2), |_| {});

// For strict durability guarantees before shutdown:
persistent.flush().await?;
// or:
// persistent.close().await?;

FileSessionStore starts periodic auto-flush only when constructed inside a Tokio runtime. If constructed outside a runtime, call flush() explicitly for durability.

MCP Server Support

Requires the mcp-client feature.

motosan-agent-loop = { version = "0.7", features = ["mcp-client"] }
use motosan_agent_loop::mcp::{McpServerStdio, McpServerHttp};

let agent = AgentLoop::builder()
    .tool(my_tool)
    // Stdio-based MCP server (spawns a child process)
    .mcp_server(McpServerStdio::new("npx", ["-y", "@modelcontextprotocol/server-filesystem"]))
    // HTTP-based MCP server
    .mcp_server(McpServerHttp::new("https://mcp.example.com/sse"))
    .build();

// MCP servers are auto-connected on run() and disconnected after.
let result = agent.run(&llm, messages, |_| {}).await?;

Cancellation

Requires the cancellation feature.

motosan-agent-loop = { version = "0.7", features = ["cancellation"] }
use tokio_util::sync::CancellationToken;

let token = CancellationToken::new();

// Cancel from another task after 5 seconds
let t = token.clone();
tokio::spawn(async move {
    tokio::time::sleep(Duration::from_secs(5)).await;
    t.cancel();
});

let result = agent
    .run_with_cancel(&llm, messages, token, |_| {})
    .await;
// Returns Err(AgentError::Cancelled) if cancelled mid-loop.

motosan-ai Bridge

Requires the motosan-ai feature.

motosan-agent-loop = { version = "0.7", features = ["motosan-ai"] }
motosan-ai = { version = "0.5", features = ["anthropic"] }
use motosan_ai::{Client, Provider};

let client = Client::builder()
    .provider(Provider::Anthropic)
    .api_key(std::env::var("ANTHROPIC_API_KEY")?)
    .build()?;

// Client implements LlmClient — use it directly.
let result = agent.run(&client, messages, |_| {}).await?;

Core Types

Type Description
LlmClient Trait — implement chat() (and optionally chat_stream()) for your LLM backend
ChatOutput Response wrapper with LlmResponse + optional TokenUsage
LlmResponse Message(String) or ToolCalls(Vec<ToolCallItem>)
StreamChunk TextDelta(String), Done(LlmResponse), or Usage(TokenUsage)
AgentLoop The core ReAct loop — call run() or run_streaming()
AgentOp Interactive commands for run_with_ops() (Interrupt, Inject*, AskUserAnswer)
AgentResult Final answer, tool call history, iteration count, token usage, messages
AgentEvent Callback events including streaming/tool events and interactive AskUser/Interrupted
AgentSession Long-lived background session wrapper with managed history
SessionStore Persistence trait for session history (MemorySessionStore, FileSessionStore)
ContextProvider Trait — inject dynamic context (RAG docs, user profiles) into conversations
McpServer Trait — MCP server abstraction (McpServerStdio, McpServerHttp)

Compatibility

Public API Surface

The following types and traits constitute the stable public API. Breaking changes to these will follow semver and be documented in the CHANGELOG.

Category Types
Core loop AgentLoop, AgentLoopBuilder, AgentResult
LLM trait LlmClient, ChatOutput, LlmResponse, StreamChunk, TokenUsage, ToolCallItem
Messages Message, Role, ToolCallRef
Interactive AgentOp, AgentEvent
Session AgentSession, SessionStore, MemorySessionStore, FileSessionStore, SessionMeta
Context ContextProvider
Channels BackpressurePolicy, ChannelConfig
Errors AgentError, Result

Event Ordering Guarantees

Events emitted via the on_event callback follow a deterministic ordering within each iteration:

  1. IterationStarted(n) — emitted once at the start of each LLM round-trip.
  2. ToolStarted { name } — emitted for each tool before execution begins.
  3. ToolCompleted { name, result } — emitted after each tool finishes.
  4. TextChunk(delta) / TextDone(full) — emitted when the LLM produces a final text response (streaming or non-streaming).

Within a parallel tool batch, each tool's ToolStarted precedes its own ToolCompleted, but events from different tools may interleave. All tool completions in a batch precede the next IterationStarted.

For ask_user tool calls, AskUser { call_id, question, options } is emitted between ToolStarted and ToolCompleted. If no answer arrives before the timeout, AskUserTimeout { call_id, question } is emitted.

Backpressure Behavior

AgentSession uses bounded channels for both user input and per-turn operations. The BackpressurePolicy controls what happens when the ops channel is full:

Policy Behavior Telemetry event
Block (default) Sender awaits until capacity is available None
Reject send_op() returns false immediately OpRejected, OpsSaturated (once)
DropOldest Drops the incoming op (not true ring-buffer) OpDropped, OpsSaturated (once)

OpsSaturated fires exactly once when the channel first reaches capacity within a turn. The saturation flag resets at each new turn.

Session Lifecycle Guarantees

AgentSession provides the following invariants (see session module docs for full details):

  • Epoch-based reset isolation: reset() bumps an atomic epoch; any in-flight turn that observes a stale epoch silently discards its results.
  • No duplicate user-message persistence: The user message is appended to the store once before the turn; post-turn write-back skips it.
  • Close semantics: close() sets a shared flag, interrupts the current turn, and performs a final flush. New send() calls are silently dropped.
  • Drop behavior: Best-effort flush. Call flush() or close() explicitly for strict durability.

Migration Notes (v0.4 to v0.7)

New in v0.7:

  • AgentOp and AgentLoop::run_with_ops() for interactive control.
  • AgentEvent::Interrupted, AskUser, and AskUserTimeout variants.
  • Built-in ask_user tool via with_ask_user() / with_ask_user_timeout().
  • AgentSession for long-lived background sessions with persistence.
  • SessionStore trait with MemorySessionStore and FileSessionStore.
  • BackpressurePolicy and ChannelConfig for bounded queue management.
  • AgentEvent::OpsSaturated, OpDropped, OpRejected telemetry events.
  • AgentSession::is_closed() method.

Non-breaking additions — all new types are additive. Existing code using AgentLoop::run() or run_streaming() continues to work unchanged. The ChannelConfig defaults (capacity 64, Block policy) preserve prior behavior.

Dependency Tree

motosan-agent-loop
  +-- motosan-agent-tool   (Tool / ToolDef / ToolResult)
  +-- async-trait
  +-- futures
  +-- serde / serde_json
  +-- thiserror
  +-- [motosan-ai]         (optional: motosan-ai feature)
  +-- [rmcp]               (optional: mcp-client feature)
  +-- [tokio-util]          (optional: cancellation feature)

License

MIT