use synaps_cli::{Runtime, StreamEvent, LlmEvent, SessionEvent};
use synaps_cli::core::compaction::compact_conversation;
use futures::StreamExt;
use serde_json::{json, Value};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio_util::sync::CancellationToken;
fn estimate_tokens(messages: &[Value]) -> usize {
let mut total_chars = 0usize;
for msg in messages {
if let Some(s) = msg["content"].as_str() {
total_chars += s.len();
} else if let Some(arr) = msg["content"].as_array() {
for block in arr {
if let Some(s) = block["text"].as_str() {
total_chars += s.len();
}
if let Some(s) = block["thinking"].as_str() {
total_chars += s.len();
}
if let Some(s) = block["content"].as_str() {
total_chars += s.len();
} else if let Some(content_arr) = block["content"].as_array() {
for inner in content_arr {
if let Some(s) = inner["text"].as_str() {
total_chars += s.len();
}
}
}
if let Some(input) = block.get("input") {
total_chars += input.to_string().len();
}
}
}
}
total_chars / 4
}
fn load_agent_prompt(name: &str) -> std::result::Result<String, String> {
synaps_cli::tools::resolve_agent_prompt(name)
}
fn log(msg: &str) {
let ts = chrono::Local::now().format("%H:%M:%S");
eprintln!("[{}] {}", ts, msg);
}
pub async fn run(
agent: Option<String>,
system: Option<String>,
name: Option<String>,
model: Option<String>,
thinking: Option<String>,
compact_at: usize,
) -> synaps_cli::Result<()> {
let _log_guard = synaps_cli::logging::init_logging();
let mut runtime = Runtime::new().await?;
let display_name = if let Some(ref agent_name) = agent {
match load_agent_prompt(agent_name) {
Ok(p) => {
runtime.set_system_prompt(p);
agent_name.clone()
}
Err(e) => {
eprintln!("❌ {}", e);
std::process::exit(1);
}
}
} else if let Some(ref val) = system {
let prompt = synaps_cli::config::resolve_system_prompt(Some(val));
runtime.set_system_prompt(prompt);
"daemon".to_string()
} else {
eprintln!("❌ Either --agent or --system is required.");
std::process::exit(1);
};
if let Some(ref m) = model {
runtime.set_model(m.clone());
}
if let Some(ref t) = thinking {
let budget = match t.as_str() {
"low" => 2048,
"medium" => 4096,
"high" => 16384,
"xhigh" => 32768,
other => other.parse::<u32>().unwrap_or(4096),
};
runtime.set_thinking_budget(budget);
}
let session_id = format!(
"{}-{}-{}",
display_name,
chrono::Utc::now().format("%Y%m%d-%H%M%S"),
std::process::id()
);
let session_name = name.or_else(|| Some(display_name.clone()));
log(&format!("booting daemon [{}] (model: {})", display_name, runtime.model()));
let socket_shutdown = Arc::new(AtomicBool::new(false));
let socket_path = synaps_cli::events::registry::socket_path_for_session(&session_id);
let socket_task = synaps_cli::events::socket::listen_session_socket(
socket_path.clone(),
runtime.event_queue().clone(),
socket_shutdown.clone(),
);
let registration = synaps_cli::events::registry::SessionRegistration {
session_id: session_id.clone(),
name: session_name.clone(),
socket_path: socket_path.clone(),
pid: std::process::id(),
started_at: chrono::Utc::now(),
};
if let Err(e) = synaps_cli::events::registry::register_session(®istration) {
log(&format!("WARNING: failed to register session: {}", e));
}
let inbox_shutdown = Arc::new(AtomicBool::new(false));
let inbox_task = {
let inbox_dir = synaps_cli::config::base_dir().join("inbox");
let eq = runtime.event_queue().clone();
let sd = inbox_shutdown.clone();
tokio::spawn(async move {
synaps_cli::events::watch_inbox(inbox_dir, eq, sd).await;
})
};
let interrupted = Arc::new(AtomicBool::new(false));
{
let flag = interrupted.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
flag.store(true, Ordering::Release);
});
}
#[cfg(unix)]
{
let flag = interrupted.clone();
tokio::spawn(async move {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sigterm) => {
sigterm.recv().await;
flag.store(true, Ordering::Release);
}
Err(e) => log(&format!("WARNING: failed to register SIGTERM handler: {e}")),
}
});
let flag = interrupted.clone();
tokio::spawn(async move {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup()) {
Ok(mut sighup) => {
sighup.recv().await;
flag.store(true, Ordering::Release);
}
Err(e) => log(&format!("WARNING: failed to register SIGHUP handler: {e}")),
}
});
}
let mut messages: Vec<Value> = Vec::new();
let mut last_compacted_tokens: usize = 0;
log(&format!(
"ready — listening on {} (name: {})",
socket_path,
session_name.as_deref().unwrap_or("none")
));
loop {
tokio::select! {
_ = runtime.event_queue().notified() => {
let mut event_count = 0;
while let Some(event) = runtime.event_queue().pop() {
event_count += 1;
let formatted = synaps_cli::events::format_event_for_agent(&event);
log(&format!(
"event [{}/{}]: {}",
event.source.source_type,
event.content.severity.as_ref().map(|s| s.as_str()).unwrap_or("medium"),
&event.content.text
));
messages.push(json!({
"role": "user",
"content": formatted
}));
}
if event_count == 0 {
continue;
}
log(&format!("processing {} event(s)...", event_count));
let cancel = CancellationToken::new();
let mut stream = runtime.run_stream_with_messages(
std::mem::take(&mut messages),
cancel,
None,
None,
).await;
while let Some(event) = stream.next().await {
match event {
StreamEvent::Llm(LlmEvent::Text(text)) => {
if !text.is_empty() {
eprint!("{}", text);
}
}
StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name: name, .. }) => {
log(&format!(" tool: {}", name));
}
StreamEvent::Llm(LlmEvent::ToolResult { result, .. }) => {
let preview: String = result.chars().take(100).collect();
log(&format!(" result: {}", preview));
}
StreamEvent::Session(SessionEvent::Usage {
input_tokens,
output_tokens,
..
}) => {
log(&format!(" tokens: +{}↑ +{}↓", input_tokens, output_tokens));
}
StreamEvent::Session(SessionEvent::MessageHistory(history)) => {
messages = history;
}
StreamEvent::Session(SessionEvent::Done) => {
break;
}
StreamEvent::Session(SessionEvent::Error(e)) => {
log(&format!(" ERROR: {}", e));
break;
}
_ => {}
}
}
eprintln!();
let est = estimate_tokens(&messages);
let effective_threshold = if last_compacted_tokens > 0 {
last_compacted_tokens + compact_at
} else {
compact_at
};
if est > effective_threshold && messages.len() >= 4 {
log(&format!(
"token estimate {} exceeds threshold {} — compacting {} messages...",
est, effective_threshold, messages.len()
));
match tokio::time::timeout(
std::time::Duration::from_secs(90),
compact_conversation(&messages, &runtime, None),
).await {
Ok(Ok(summary)) => {
let summary_tokens = summary.len() / 4;
log(&format!(
"compacted ~{} tokens → ~{} token summary",
est, summary_tokens
));
messages = vec![json!({
"role": "user",
"content": format!(
"<context-summary>\n{}\n</context-summary>",
summary
)
})];
last_compacted_tokens = estimate_tokens(&messages);
}
Ok(Err(e)) => {
log(&format!("compaction failed: {} — continuing with full history", e));
}
Err(_) => {
log("compaction timed out (90s) — continuing with full history");
}
}
}
log("idle — waiting for events...");
if !runtime.event_queue().is_empty() {
continue;
}
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
if interrupted.load(Ordering::Acquire) {
log("interrupted — shutting down");
break;
}
if !runtime.event_queue().is_empty() {
continue;
}
}
}
}
socket_shutdown.store(true, Ordering::Release);
inbox_shutdown.store(true, Ordering::Release);
let _ = tokio::join!(socket_task, inbox_task);
synaps_cli::events::registry::unregister_session(&session_id);
log("daemon stopped.");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn estimate_string_content() {
let msgs = vec![json!({"role": "user", "content": "aaaa"})]; assert_eq!(estimate_tokens(&msgs), 1);
}
#[test]
fn estimate_empty_messages() {
assert_eq!(estimate_tokens(&[]), 0);
}
#[test]
fn estimate_array_content_text_and_thinking() {
let msgs = vec![json!({"role": "assistant", "content": [
{"type": "text", "text": "aaaaaaaa"}, {"type": "thinking", "thinking": "aaaa"}, ]})];
assert_eq!(estimate_tokens(&msgs), 3); }
#[test]
fn estimate_tool_result_string() {
let msgs = vec![json!({"role": "user", "content": [
{"type": "tool_result", "content": "aaaaaaaaaaaa"} ]})];
assert_eq!(estimate_tokens(&msgs), 3); }
#[test]
fn estimate_tool_result_array_mcp() {
let msgs = vec![json!({"role": "user", "content": [
{"type": "tool_result", "content": [{"text": "aaaaaaaaaaaaaaaa"}]} ]})];
assert_eq!(estimate_tokens(&msgs), 4); }
#[test]
fn estimate_tool_use_input() {
let msgs = vec![json!({"role": "assistant", "content": [
{"type": "tool_use", "input": {"key": "val"}}
]})];
let est = estimate_tokens(&msgs);
assert!(est > 0, "tool_use input should contribute tokens");
}
fn should_compact(est: usize, last_compacted: usize, compact_at: usize, msg_count: usize) -> bool {
let threshold = if last_compacted > 0 { last_compacted + compact_at } else { compact_at };
est > threshold && msg_count >= 4
}
#[test]
fn first_compaction_triggers() {
assert!(should_compact(90000, 0, 80000, 10));
}
#[test]
fn post_compact_hysteresis_suppresses() {
assert!(!should_compact(50000, 5000, 80000, 10));
}
#[test]
fn hysteresis_allows_after_growth() {
assert!(should_compact(90000, 5000, 80000, 10));
}
#[test]
fn too_few_messages_blocks_compaction() {
assert!(!should_compact(90000, 0, 80000, 2));
}
}