use std::sync::Arc;
use serde_json::Value as JsonValue;
use dotenvy::dotenv;
use langgraph::prelude::*;
use langgraph_derive::langgraph_state;
use langgraph_prebuilt::{ask_json, print_stream, response_text, stream_llm, BaseChatModel, Message};
use langgraph_providers::openai::{OpenAIModel, OpenAIModelConfig};
fn load_openai_config() -> (String, Option<String>, String) {
dotenv().ok();
let api_key =
std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY must be set in .env or environment");
let api_base = std::env::var("OPENAI_API_BASE").ok();
let model_name = std::env::var("OPENAI_MODEL").unwrap_or_else(|_| "mimo-v2.5-pro".to_string());
(api_key, api_base, model_name)
}
#[langgraph_state]
#[derive(Debug)]
struct ManusState {
#[channel(messages)]
messages: Vec<Message>,
#[channel]
plan: JsonValue,
}
fn pending_steps(plan: &JsonValue) -> usize {
plan.get("steps")
.and_then(|s| s.as_array())
.map(|steps| {
steps
.iter()
.filter(|s| s.get("status").and_then(|v| v.as_str()) != Some("completed"))
.count()
})
.unwrap_or(0)
}
fn user_message(input: &JsonValue) -> &str {
input
.get("messages")
.and_then(|m| m.as_array())
.and_then(|msgs| msgs.last())
.and_then(|m| m.get("content"))
.and_then(|c| c.as_str())
.unwrap_or("")
}
const CREATE_PLAN_PROMPT: &str = r#"You are a planning agent. Given the user's request, create a step-by-step plan.
Respond in JSON format ONLY (no markdown fences):
{
"title": "Plan title",
"steps": [
{ "description": "Step 1 description", "status": "pending" },
{ "description": "Step 2 description", "status": "pending" }
]
}
Keep the plan concise with 2-5 actionable steps."#;
const EXECUTE_STEP_PROMPT: &str = r#"You are an execution agent. Execute the current step of the plan.
Respond in JSON format ONLY (no markdown fences):
{
"success": true,
"result": "Description of what was done and the result"
}
If the step fails, set success to false and explain the error in result."#;
const REPLAN_PROMPT: &str = r#"You are a planning agent. A step has just been completed. Review and update the plan.
Respond with the FULL updated plan in JSON format ONLY (no markdown fences):
{
"title": "Plan title",
"steps": [
{ "description": "Step description", "status": "completed", "result": "what happened" },
{ "description": "Next step description", "status": "pending" }
]
}
Mark completed steps with status "completed". Adjust remaining steps based on what was learned."#;
const SUMMARIZE_PROMPT: &str = r#"All plan steps have been completed. Write a concise summary of what was accomplished.
Respond in JSON format ONLY (no markdown fences):
{
"message": "Summary of the work done"
}"#;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("========================================");
println!(" Manus-like Plan-and-Act Agent");
println!("========================================\n");
let (api_key, api_base, model_name) = load_openai_config();
let model: Arc<dyn BaseChatModel> = Arc::new(OpenAIModel::new(OpenAIModelConfig {
model: model_name,
api_key,
api_base,
temperature: Some(0.3),
..Default::default()
}));
let channels = ManusState::create_channels();
let mut graph = StateGraph::new(channels);
let m = model.clone();
graph.add_node("planner", move |input: JsonValue, _config: RunnableConfig| {
let model = m.clone();
async move {
let prompt = format!("{}\n\nUser request: {}", CREATE_PLAN_PROMPT, user_message(&input));
let plan = ask_json(model.as_ref(), &prompt, "").await?
.unwrap_or_else(|| serde_json::json!({"title": "Untitled", "steps": []}));
let step_count = plan.get("steps").and_then(|s| s.as_array()).map(|a| a.len()).unwrap_or(0);
println!("[planner] Created plan: {} ({} steps)",
plan.get("title").and_then(|t| t.as_str()).unwrap_or("Untitled"), step_count);
for (i, step) in plan.get("steps").and_then(|s| s.as_array()).unwrap_or(&vec![]).iter().enumerate() {
println!(" {}. {}", i + 1, step.get("description").and_then(|d| d.as_str()).unwrap_or(""));
}
Ok(serde_json::json!({"plan": plan}))
}
})?;
let m = model.clone();
graph.add_node("executor", move |input: JsonValue, _config: RunnableConfig| {
let model = m.clone();
async move {
let plan = input.get("plan").cloned().unwrap_or_default();
let steps = plan.get("steps").and_then(|s| s.as_array()).cloned().unwrap_or_default();
let idx = match steps.iter().position(|s| s.get("status").and_then(|v| v.as_str()) != Some("completed")) {
Some(i) => i,
None => return Ok(serde_json::json!({"plan": plan})),
};
let desc = steps[idx].get("description").and_then(|d| d.as_str()).unwrap_or("?");
println!("\n[executor] Step {}/{}: {}", idx + 1, steps.len(), desc);
let prompt = format!("{}\n\nCurrent step: {}", EXECUTE_STEP_PROMPT, desc);
let exec = ask_json(model.as_ref(), &prompt, "").await?
.unwrap_or_else(|| serde_json::json!({"success": false, "result": "Parse error"}));
let success = exec.get("success").and_then(|v| v.as_bool()).unwrap_or(false);
let step_result = exec.get("result").and_then(|r| r.as_str()).unwrap_or("");
if success { println!("[executor] Done: {}", step_result); }
else { println!("[executor] Failed: {}", step_result); }
let mut updated_plan = plan;
if let Some(steps) = updated_plan.get_mut("steps").and_then(|s| s.as_array_mut()) {
if let Some(step) = steps.get_mut(idx) {
if let Some(obj) = step.as_object_mut() {
obj.insert("status".to_string(), serde_json::json!("completed"));
obj.insert("result".to_string(), serde_json::json!(step_result));
}
}
}
Ok(serde_json::json!({"plan": updated_plan}))
}
})?;
let m = model.clone();
graph.add_node("replanner", move |input: JsonValue, _config: RunnableConfig| {
let model = m.clone();
async move {
let plan = input.get("plan").cloned().unwrap_or_default();
println!("[replanner] {} steps remaining, updating plan...", pending_steps(&plan));
let prompt = format!("{}\n\nCurrent plan:\n{}", REPLAN_PROMPT, serde_json::to_string_pretty(&plan).unwrap_or_default());
let updated_plan = ask_json(model.as_ref(), &prompt, "").await?
.unwrap_or(plan);
println!("[replanner] Plan updated, {} steps remaining", pending_steps(&updated_plan));
Ok(serde_json::json!({"plan": updated_plan}))
}
})?;
let m = model.clone();
graph.add_node("summarizer", move |input: JsonValue, _config: RunnableConfig| {
let model = m.clone();
async move {
let plan = input.get("plan").cloned().unwrap_or_default();
println!("[summarizer] Generating summary...");
let prompt = format!("{}\n\nCompleted plan:\n{}", SUMMARIZE_PROMPT, serde_json::to_string_pretty(&plan).unwrap_or_default());
let result = stream_llm(model.as_ref(), &serde_json::json!({"messages": [{"type": "human", "content": prompt}]}), "").await?;
let text = response_text(&result);
println!("[summarizer] Done\n=== Summary ===\n{}\n===============", text);
Ok(serde_json::json!({"messages": [{"type": "ai", "content": text}]}))
}
})?;
graph.add_edge(START, "planner")?;
graph.add_edge("planner", "executor")?;
conditional_edges!(graph, "executor", route_after_executor, "replanner" => "replanner", END => END)?;
conditional_edges!(graph, "replanner", route_after_replanner, "executor" => "executor", "summarizer" => "summarizer", END => END)?;
graph.add_edge("summarizer", END)?;
let app = graph.compile()?;
let input = serde_json::json!({
"messages": [{ "type": "human", "content": "规划深圳的3天旅游." }]
});
println!("User: 规划深圳的3天旅游.\n");
let mut stream = app.astream(&input, &RunnableConfig::new(), vec![StreamMode::Custom, StreamMode::Updates]);
let _ = print_stream(&mut stream, true).await;
println!("\n========================================\n Demo completed!\n========================================");
Ok(())
}
fn route_after_executor(input: &JsonValue) -> String {
let pending = pending_steps(&input.get("plan").cloned().unwrap_or_default());
if pending == 0 { return END.to_string(); }
println!("[route] {} steps remaining → replanner", pending);
"replanner".to_string()
}
fn route_after_replanner(input: &JsonValue) -> String {
let pending = pending_steps(&input.get("plan").cloned().unwrap_or_default());
if pending == 0 { return "summarizer".to_string(); }
println!("[route] {} steps remaining → executor", pending);
"executor".to_string()
}