use synaps_cli::engine::setup::{self, EngineOpts};
use synaps_cli::engine::commands::{self, CommandResult};
use synaps_cli::engine::stream::{self, EngineStreamEvent, StreamCompletion, SubagentTracker};
use synaps_cli::engine::session::ConversationState;
use synaps_cli::{CancellationToken, flush_stdout};
use synaps_cli::core::compaction::compact_conversation;
use futures::StreamExt;
use serde_json::json;
use std::io::{self, Write, BufRead};
pub async fn run(
continue_session: Option<String>,
system: Option<String>,
agent: Option<String>,
profile: Option<String>,
no_extensions: bool,
) -> synaps_cli::Result<()> {
let boot = setup::boot(EngineOpts {
continue_session: continue_session.map(Some),
system,
profile,
no_extensions,
}).await?;
let mut runtime = boot.runtime;
let mut conv = if boot.continued {
ConversationState::from_resumed(boot.session)
} else {
ConversationState::new(boot.session)
};
if let Some(ref agent_name) = agent {
match synaps_cli::tools::resolve_agent_prompt(agent_name) {
Ok(p) => {
eprintln!("🎭 Agent: {}", agent_name);
runtime.set_system_prompt(p);
}
Err(e) => {
eprintln!("❌ {}", e);
std::process::exit(1);
}
}
}
if !no_extensions {
let (loader_tx, mut loader_rx) = tokio::sync::mpsc::unbounded_channel();
synaps_cli::extensions::loader::spawn_discover_and_load(
std::sync::Arc::clone(&boot.ext_manager),
loader_tx,
);
tokio::spawn(async move {
while loader_rx.recv().await.is_some() {}
});
}
eprintln!("synaps {} | {} | session {}",
env!("CARGO_PKG_VERSION"),
runtime.model(),
&conv.session.id[..8]
);
if boot.continued {
eprintln!("↳ resumed session ({} messages)", conv.api_messages.len());
}
if boot.mcp_server_count > 0 {
eprintln!("↳ {} MCP servers available", boot.mcp_server_count);
}
eprintln!();
let stdin = io::stdin();
let is_tty = std::io::IsTerminal::is_terminal(&std::io::stdin());
let mut subagents: Vec<SubagentTracker> = Vec::new();
let compact_threshold: usize = 80_000;
let mut last_compacted_tokens: usize = 0;
loop {
if is_tty {
eprint!("❯ ");
io::stderr().flush().ok();
}
let mut input = String::new();
match stdin.lock().read_line(&mut input) {
Ok(0) => break, Err(e) => {
eprintln!("input error: {}", e);
break;
}
Ok(_) => {}
}
let input = input.trim();
if input.is_empty() { continue; }
if let Some((cmd, arg)) = commands::parse_command(input) {
if let Some(result) = commands::handle_engine_command(cmd, arg, &mut runtime) {
match result {
CommandResult::Quit => break,
CommandResult::ModelChanged { model } => {
eprintln!("model → {}", model);
}
CommandResult::ThinkingChanged { level, budget } => {
eprintln!("thinking → {} ({})", level, budget);
}
CommandResult::Compact => {
eprintln!("compacting...");
if let Ok(summary) = compact_conversation(
&conv.api_messages, &runtime, None
).await {
conv.api_messages = vec![json!({
"role": "user",
"content": format!("<context-summary>\n{}\n</context-summary>", summary)
})];
last_compacted_tokens = conv.estimate_tokens();
eprintln!("compacted → ~{} tokens", last_compacted_tokens);
}
}
CommandResult::Error(e) => eprintln!("error: {}", e),
CommandResult::Output(text) => println!("{}", text),
_ => {} }
continue;
}
match cmd {
"clear" => {
conv.clear(&runtime).await;
eprintln!("session cleared → {}", &conv.session.id[..8]);
}
"sessions" => {
match synaps_cli::list_sessions() {
Ok(sessions) => {
for s in sessions.iter().take(20) {
let marker = if s.id == conv.session.id { "→ " } else { " " };
eprintln!("{}{} {} ({}, ${:.4})",
marker, &s.id[..8], s.title, s.model, s.session_cost);
}
}
Err(e) => eprintln!("error: {}", e),
}
}
"status" => {
eprintln!("session: {}", &conv.session.id[..8]);
eprintln!("model: {}", runtime.model());
eprintln!("tokens: {}↑ {}↓", conv.total_input_tokens, conv.total_output_tokens);
eprintln!("cost: ${:.4}", conv.session_cost);
eprintln!("messages: {}", conv.api_messages.len());
eprintln!("est. tokens: ~{}", conv.estimate_tokens());
}
"help" => {
eprintln!("commands: /model /thinking /compact /clear /sessions /status /quit");
}
_ => {
eprintln!("unknown command: /{} (try /help)", cmd);
}
}
continue;
}
let message = if let Some(ctx) = conv.abort_context.take() {
format!("{}\n\n[ABORT CONTEXT — your previous response was interrupted. Here's what you completed before the abort:]\n\n{}\n\n[END ABORT CONTEXT — continue from where you left off or adjust based on the user's new message]", input, ctx)
} else {
input.to_string()
};
conv.api_messages.push(json!({"role": "user", "content": message}));
let cancel = CancellationToken::new();
let mut stream = runtime.run_stream_with_messages(
conv.api_messages.clone(), cancel, None, None, false
).await;
let mut in_thinking = false;
while let Some(event) = stream.next().await {
let (engine_event, completion) = stream::process_stream_event(
event,
&mut conv.api_messages,
&mut subagents,
&mut conv.queued_message,
&mut conv.pending_events,
);
match engine_event {
EngineStreamEvent::Thinking(text) => {
if !in_thinking {
eprint!("\x1b[2m"); in_thinking = true;
}
eprint!("{}", text);
io::stderr().flush().ok();
}
EngineStreamEvent::Text(text) => {
if in_thinking {
eprintln!("\x1b[0m"); in_thinking = false;
}
print!("{}", text);
flush_stdout();
}
EngineStreamEvent::ToolStart { tool_name, .. } => {
if in_thinking { eprintln!("\x1b[0m"); in_thinking = false; }
eprint!("\x1b[33m⚙ {}\x1b[0m", tool_name);
io::stderr().flush().ok();
}
EngineStreamEvent::ToolFinalized { tool_name, input, .. } => {
let input_preview = serde_json::to_string(&input).unwrap_or_default();
let preview: String = input_preview.chars().take(60).collect();
eprintln!("\x1b[33m ⚙ {} ({})\x1b[0m", tool_name, preview);
}
EngineStreamEvent::ToolResult { result, .. } => {
let preview: String = result.chars().take(80).collect();
eprintln!("\x1b[32m → {}\x1b[0m", preview);
}
EngineStreamEvent::SubagentStart { name, task, .. } => {
eprintln!("\x1b[35m🎭 [{}] {}\x1b[0m", name, task);
}
EngineStreamEvent::SubagentDone { status, duration_secs, .. } => {
eprintln!("\x1b[32m✔ {} ({:.1}s)\x1b[0m", status, duration_secs);
}
EngineStreamEvent::Usage { input_tokens, output_tokens, cache_read, cache_creation, model } => {
let model_name = model.as_deref().unwrap_or(runtime.model());
conv.add_usage(input_tokens, output_tokens, cache_read, cache_creation, model_name);
}
EngineStreamEvent::SteeringDelivered { message } => {
eprintln!("\x1b[33m→ [steering] {}\x1b[0m", message);
}
EngineStreamEvent::Done | EngineStreamEvent::Noop => {}
EngineStreamEvent::Error(e) => {
eprintln!("\x1b[31m❌ {}\x1b[0m", e);
}
_ => {} }
match completion {
StreamCompletion::Done | StreamCompletion::Error(_) => {
if in_thinking { eprintln!("\x1b[0m"); }
println!();
break;
}
StreamCompletion::AutoSendQueued(queued) => {
conv.api_messages.push(json!({"role": "user", "content": queued}));
break;
}
StreamCompletion::AutoTriggerEvents => break,
StreamCompletion::Continue => {}
}
}
conv.save().await;
let est = conv.estimate_tokens();
let threshold = if last_compacted_tokens > 0 {
last_compacted_tokens + compact_threshold
} else {
compact_threshold
};
if est > threshold && conv.api_messages.len() >= 4 {
eprintln!("\x1b[2m[auto-compacting ~{} tokens...]\x1b[0m", est);
if let Ok(summary) = compact_conversation(&conv.api_messages, &runtime, None).await {
conv.api_messages = vec![json!({
"role": "user",
"content": format!("<context-summary>\n{}\n</context-summary>", summary)
})];
last_compacted_tokens = conv.estimate_tokens();
eprintln!("\x1b[2m[compacted → ~{} tokens]\x1b[0m", last_compacted_tokens);
}
}
}
conv.save().await;
let hook_event = synaps_cli::extensions::hooks::events::HookEvent::on_session_end(
&conv.session.id,
None, );
let _ = runtime.hook_bus().emit(&hook_event).await;
boot.background.shutdown();
eprintln!("session saved: {} (${:.4})", &conv.session.id[..8], conv.session_cost);
Ok(())
}