Tianshu 天枢
The celestial pivot — the simplest way to build long-running AI agent workflow orchestration in Rust.
简体中文 | English
What is Tianshu?
Tianshu is a checkpoint-safe, coroutine-like workflow engine for building AI agent orchestration systems in Rust.
Most workflow frameworks ask you to think in graphs: define nodes, connect edges, wire up state schemas. Tianshu takes a different approach — you write normal sequential async code. Each ctx.step() call is automatically checkpointed. If your process crashes, it resumes from the last completed step with zero extra configuration.
async
That's it. No node definitions. No edge wiring. No state schema. Just code.
The core idea
The mental model is borrowed from coroutines: suspend at a checkpoint, resume later. Except you don't write coroutine boilerplate — ctx.step() handles it for you.
| What you write | What happens |
|---|---|
ctx.step("name", |_| async { ... }) |
Step executes; result is persisted to storage |
| Process crashes mid-step | Next run re-executes only that step |
| Process crashes between steps | Next run skips all completed steps, resumes from the failed one |
| Step completes normally | Checkpoint is stored; never re-executed |
This makes long-term tasks natural. A workflow can return Waiting(...) to sleep for hours or days until an external event arrives — without holding any thread or connection.
Tianshu vs LangGraph
LangGraph is an excellent tool. Tianshu solves a different problem: you want the simplest possible mental model for building durable, long-running agent workflows in a production Rust service.
| LangGraph | Tianshu | |
|---|---|---|
| Language | Python | Rust |
| Mental model | Graph: define nodes + edges explicitly | Coroutine-like: write sequential async code |
| Checkpointing | Configure a checkpointer on graph compile |
Automatic — every ctx.step() is a checkpoint |
| Crash recovery | Resumes from last checkpoint if configured | Always on — restart process, resume from last step |
| Long-running tasks | Supported | First-class — Waiting(polls) suspends with zero resources |
| Storage backends | SQLite, PostgreSQL, Redis (official) | Any database — implement two small traits |
| LLM integration | Via LangChain (Python ecosystem) | Via LlmProvider trait — any API, any vendor |
| Concurrency | Python asyncio | Rust Tokio — native async/await, no GIL |
| Observability | LangSmith (commercial platform) | Structured logging via tracing (see Observability) |
| Tool orchestration | ToolNode / custom | Tool trait + ToolRegistry — read/write concurrency |
| Streaming LLM | Via LangChain streaming | First-class StreamingLlmProvider trait |
| Error recovery | Custom retry logic | RetryPolicy + ResilientLlmProvider with fallbacks |
| Sub-workflow spawning | Subgraphs | ctx.spawn_child() — checkpoint-safe, zero resources while waiting |
| Context management | Custom | ManagedConversation — auto-compacts at configurable threshold |
| License | MIT | MIT |
Code comparison
LangGraph — define nodes, connect edges, compile:
return
return
=
=
Tianshu — just write the flow:
async
Key advantages
1. Coroutine-like — reads like normal code
No graph topology, no state schemas, no node/edge wiring. The flow of your workflow is the code. A new engineer can read it top to bottom and understand it.
2. Automatic crash recovery
Every ctx.step() persists its result before returning. On restart, completed steps are skipped, and execution resumes from exactly where it left off. You get fault tolerance for free, without thinking about it.
3. Long-term task support
Poll predicates let a workflow declare "wake me up when X arrives" and then suspend cleanly:
return Ok;
The workflow holds no thread, no connection, no memory while waiting. It can resume hours or days later.
4. Database-agnostic storage
Three small traits. Implement them for your backend of choice:
SessionStore is intentionally minimal — session structure is highly business-specific, so users should implement it to match their schema. The engine provides InMemorySessionStore and PostgresSessionStore as reference implementations.
The community can (and should) build adapters for MySQL, MongoDB, Redis, DynamoDB, and anything else.
5. Any LLM, any vendor
let llm = new;
// or Ollama (local)
let llm = builder
.base_url
.build;
// or Doubao
let llm = builder
.base_url
.build;
6. Tool orchestration with concurrency-safe execution
Define tools with the Tool trait and register them in a ToolRegistry. Call ctx.tool_step() to run a full LLM-driven tool-use loop — the engine calls the model, executes tool calls, and feeds results back until the model returns plain text:
;
let mut tools = new;
tools.register;
let result = ctx.tool_step.await?;
ReadOnly tools run in parallel; Exclusive tools run alone. The engine partitions each round of tool calls automatically.
7. Error recovery and resilient providers
Wrap any LlmProvider with retry and fallback logic:
let policy = RetryPolicy ;
let llm = new.with_fallback;
Errors are classified automatically: Transient retries with backoff, ProviderOverloaded tries a fallback provider, Fatal stops immediately. Use ctx.step_with_retry() to apply retry logic at the step level:
let result = ctx.step_with_retry.await?;
8. Streaming LLM responses
Implement StreamingLlmProvider to deliver LlmStreamEvent values as they arrive. The OpenAiProvider already implements it out of the box:
let = channel;
llm.stream.await?;
while let Some = rx.recv.await
9. Sub-workflow spawning
A workflow can spawn child workflows and wait for them — with full checkpoint safety. The parent suspends as Waiting, freeing all threads and connections until all children finish:
let handles = ctx.spawn_children.await?;
// Parent suspends here — no thread held — resumes when all children are done
let result = ctx.await_children.await?;
10. Token-aware context compaction
ManagedConversation tracks token usage and automatically compacts conversation history before hitting context limits:
let mut conv = new;
conv.push.await?; // auto-compacts if approaching the limit
ctx.set_managed_conversation; // attach to workflow context
Use LlmSummaryCompaction to summarise dropped messages with an LLM call instead of truncating them.
Quick start
[]
= { = "https://github.com/your-org/tianshu-rs" }
use Arc;
use ;
;
async
See examples/approval_workflow for a complete example with polling and stage transitions.
Sessions and cross-case variables
A session groups related cases. One session can have multiple cases running in parallel:
use ;
let session = new
.with_metadata;
let cases = vec!; // all share session_1
let env = new
.with_execution_mode;
Cases within a session can share session-scoped variables — state that is visible across cases:
// In any workflow's run() method:
ctx.set_session_state.await?;
// Another case in the same session can read it:
let val: i32 = ctx.get_session_state.await?;
Session-scoped variables use last-write-wins semantics. The engine provides no locking — workflows that need concurrency control for shared variables must implement it themselves.
Crates
| Crate | Description |
|---|---|
workflow_engine |
Core library: scheduler, traits, in-memory stores |
workflow_engine_postgres |
PostgreSQL adapters for SessionStore + CaseStore + StateStore |
workflow_engine_llm_openai |
LlmProvider adapter for OpenAI-compatible APIs |
Running the example
# Start PostgreSQL (optional)
# Run with in-memory stores
# Run with PostgreSQL
DATABASE_URL=postgres://postgres:postgres@localhost/workflow_engine \
Observability
Tianshu emits structured logs via the tracing crate at every meaningful state transition:
- Scheduler tick phases (partition → probe → evaluate → execute)
- Workflow state changes (Running → Waiting → Finished)
- Poll predicate evaluation and matches
- Checkpoint save and restore
- Database operations and LLM calls
Configure output format via tracing-subscriber:
// JSON (for log aggregators)
fmt.json.with_env_filter.init;
// Text (for development)
fmt.with_env_filter.init;
What's not yet implemented:
- Step-level timing / duration spans
- Prometheus / metrics counters
- OpenTelemetry / distributed tracing
These are good first contributions. See CONTRIBUTING.md.
Roadmap
Recently implemented:
- Tool orchestration —
Tooltrait,ToolRegistry,ctx.tool_step() - Error recovery —
RetryPolicy,ResilientLlmProvider,ctx.step_with_retry() - Streaming LLM —
StreamingLlmProvider,LlmStreamEvent, OpenAI SSE streaming - Sub-workflow spawning —
ctx.spawn_child(),ctx.await_children() - Context management —
ManagedConversation,TruncationCompaction,LlmSummaryCompaction
Up next:
- OpenTelemetry integration (trace context propagation)
- Step-level timing spans
-
workflow_engine_sqlite— SQLite adapter for lightweight deployments -
workflow_engine_mongodb— MongoDB adapter -
workflow_engine_llm_anthropic— Claude API adapter - Intent routing example (LLM-based message classification)
- Admin API for inspecting and replaying workflows
Running tests
# Unit + in-memory integration tests (no database required)
# PostgreSQL integration tests
DATABASE_URL=postgres://postgres:postgres@localhost/workflow_engine \