motosan-agent-loop
A standalone ReAct agent loop for Rust. It drives any LLM through iterative
reasoning and tool execution, producing a final answer — with zero platform
dependencies. Bring your own LLM backend (OpenAI, Anthropic, local models, or a
test double) by implementing the LlmClient trait, register tools via
motosan-agent-tool, and let AgentLoop handle the rest.
Features
| Feature | Default | Description |
|---|---|---|
motosan-ai |
off | LlmClient bridge for motosan-ai SDK |
mcp-client |
off | MCP (Model Context Protocol) server support via rmcp |
cancellation |
off | CancellationToken support for graceful loop cancellation |
Quick Start
[]
= "0.7"
= "0.2"
= "0.1"
= { = "1", = ["macros", "rt-multi-thread"] }
use async_trait;
use ;
use ToolDef;
;
async
Builder API
let agent = builder
.tool // register a tool
.tools // register multiple tools
.system_prompt // shortcut for a ContextProvider
.context // dynamic context injection
.max_iterations // default: 10
.tool_timeout // per-tool call timeout
.build;
Streaming
let result = agent
.run_streaming
.await?;
Override LlmClient::chat_stream() for true streaming; the default falls back to chat().
Interactive Control (run_with_ops)
Use run_with_ops() when you need to interrupt or inject input while a turn is running.
use mpsc;
use ;
let = channel;
let run = agent.run_with_ops;
// From another task:
let _ = ops_tx.send.await;
let _ = ops_tx.send.await;
let result = run.await?;
AgentSession (Long-Lived Background Session)
use Arc;
use ;
let = new;
session.send.await;
session.inject.await;
session.interrupt.await;
// Persistent session
let store = new;
let =
new_with_store;
// For strict durability guarantees before shutdown:
persistent.flush.await?;
// or:
// persistent.close().await?;
FileSessionStore starts periodic auto-flush only when constructed inside a Tokio runtime.
If constructed outside a runtime, call flush() explicitly for durability.
MCP Server Support
Requires the mcp-client feature.
= { = "0.7", = ["mcp-client"] }
use ;
let agent = builder
.tool
// Stdio-based MCP server (spawns a child process)
.mcp_server
// HTTP-based MCP server
.mcp_server
.build;
// MCP servers are auto-connected on run() and disconnected after.
let result = agent.run.await?;
Cancellation
Requires the cancellation feature.
= { = "0.7", = ["cancellation"] }
use CancellationToken;
let token = new;
// Cancel from another task after 5 seconds
let t = token.clone;
spawn;
let result = agent
.run_with_cancel
.await;
// Returns Err(AgentError::Cancelled) if cancelled mid-loop.
motosan-ai Bridge
Requires the motosan-ai feature.
= { = "0.7", = ["motosan-ai"] }
= { = "0.5", = ["anthropic"] }
use ;
let client = builder
.provider
.api_key
.build?;
// Client implements LlmClient — use it directly.
let result = agent.run.await?;
Core Types
| Type | Description |
|---|---|
LlmClient |
Trait — implement chat() (and optionally chat_stream()) for your LLM backend |
ChatOutput |
Response wrapper with LlmResponse + optional TokenUsage |
LlmResponse |
Message(String) or ToolCalls(Vec<ToolCallItem>) |
StreamChunk |
TextDelta(String), Done(LlmResponse), or Usage(TokenUsage) |
AgentLoop |
The core ReAct loop — call run() or run_streaming() |
AgentOp |
Interactive commands for run_with_ops() (Interrupt, Inject*, AskUserAnswer) |
AgentResult |
Final answer, tool call history, iteration count, token usage, messages |
AgentEvent |
Callback events including streaming/tool events and interactive AskUser/Interrupted |
AgentSession |
Long-lived background session wrapper with managed history |
SessionStore |
Persistence trait for session history (MemorySessionStore, FileSessionStore) |
ContextProvider |
Trait — inject dynamic context (RAG docs, user profiles) into conversations |
McpServer |
Trait — MCP server abstraction (McpServerStdio, McpServerHttp) |
Compatibility
Public API Surface
The following types and traits constitute the stable public API. Breaking changes to these will follow semver and be documented in the CHANGELOG.
| Category | Types |
|---|---|
| Core loop | AgentLoop, AgentLoopBuilder, AgentResult |
| LLM trait | LlmClient, ChatOutput, LlmResponse, StreamChunk, TokenUsage, ToolCallItem |
| Messages | Message, Role, ToolCallRef |
| Interactive | AgentOp, AgentEvent |
| Session | AgentSession, SessionStore, MemorySessionStore, FileSessionStore, SessionMeta |
| Context | ContextProvider |
| Channels | BackpressurePolicy, ChannelConfig |
| Errors | AgentError, Result |
Event Ordering Guarantees
Events emitted via the on_event callback follow a deterministic ordering
within each iteration:
IterationStarted(n)— emitted once at the start of each LLM round-trip.ToolStarted { name }— emitted for each tool before execution begins.ToolCompleted { name, result }— emitted after each tool finishes.TextChunk(delta)/TextDone(full)— emitted when the LLM produces a final text response (streaming or non-streaming).
Within a parallel tool batch, each tool's ToolStarted precedes its own
ToolCompleted, but events from different tools may interleave. All tool
completions in a batch precede the next IterationStarted.
For ask_user tool calls, AskUser { call_id, question, options } is
emitted between ToolStarted and ToolCompleted. If no answer arrives
before the timeout, AskUserTimeout { call_id, question } is emitted.
Backpressure Behavior
AgentSession uses bounded channels for both user input and per-turn
operations. The BackpressurePolicy controls what happens when the ops
channel is full:
| Policy | Behavior | Telemetry event |
|---|---|---|
Block (default) |
Sender awaits until capacity is available | None |
Reject |
send_op() returns false immediately |
OpRejected, OpsSaturated (once) |
DropOldest |
Drops the incoming op (not true ring-buffer) | OpDropped, OpsSaturated (once) |
OpsSaturated fires exactly once when the channel first reaches capacity
within a turn. The saturation flag resets at each new turn.
Session Lifecycle Guarantees
AgentSession provides the following invariants (see session module docs
for full details):
- Epoch-based reset isolation:
reset()bumps an atomic epoch; any in-flight turn that observes a stale epoch silently discards its results. - No duplicate user-message persistence: The user message is appended to the store once before the turn; post-turn write-back skips it.
- Close semantics:
close()sets a shared flag, interrupts the current turn, and performs a final flush. Newsend()calls are silently dropped. - Drop behavior: Best-effort flush. Call
flush()orclose()explicitly for strict durability.
Migration Notes (v0.4 to v0.7)
New in v0.7:
AgentOpandAgentLoop::run_with_ops()for interactive control.AgentEvent::Interrupted,AskUser, andAskUserTimeoutvariants.- Built-in
ask_usertool viawith_ask_user()/with_ask_user_timeout(). AgentSessionfor long-lived background sessions with persistence.SessionStoretrait withMemorySessionStoreandFileSessionStore.BackpressurePolicyandChannelConfigfor bounded queue management.AgentEvent::OpsSaturated,OpDropped,OpRejectedtelemetry events.AgentSession::is_closed()method.
Non-breaking additions — all new types are additive. Existing code using
AgentLoop::run() or run_streaming() continues to work unchanged. The
ChannelConfig defaults (capacity 64, Block policy) preserve prior behavior.
Dependency Tree
motosan-agent-loop
+-- motosan-agent-tool (Tool / ToolDef / ToolResult)
+-- async-trait
+-- futures
+-- serde / serde_json
+-- thiserror
+-- [motosan-ai] (optional: motosan-ai feature)
+-- [rmcp] (optional: mcp-client feature)
+-- [tokio-util] (optional: cancellation feature)
License
MIT