use anyhow::Result;
use clap::{Args, Subcommand};
use std::fs;
use std::path::PathBuf;
#[cfg(unix)]
use daemonize::Daemonize;
use localgpt::concurrency::TurnGate;
use localgpt::config::Config;
use localgpt::heartbeat::HeartbeatRunner;
use localgpt::memory::MemoryManager;
use localgpt::server::Server;
pub fn stop_sync() -> Result<()> {
let pid_file = get_pid_file()?;
if !pid_file.exists() {
println!("Daemon is not running");
return Ok(());
}
let pid = fs::read_to_string(&pid_file)?.trim().to_string();
if !is_process_running(&pid) {
println!("Daemon is not running (stale PID file)");
fs::remove_file(&pid_file)?;
return Ok(());
}
println!("Stopping daemon (PID: {})...", pid);
#[cfg(unix)]
{
use std::process::Command;
Command::new("kill").args(["-TERM", &pid]).status()?;
}
#[cfg(windows)]
{
use std::process::Command;
Command::new("taskkill").args(["/PID", &pid]).status()?;
}
for _ in 0..50 {
if !is_process_running(&pid) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
if is_process_running(&pid) {
anyhow::bail!("Failed to stop daemon (PID: {})", pid);
}
println!("Daemon stopped");
fs::remove_file(&pid_file).ok();
Ok(())
}
#[cfg(unix)]
pub fn daemonize_and_run(agent_id: &str) -> Result<()> {
let config = Config::load()?;
let pid_file = get_pid_file()?;
if pid_file.exists() {
let pid = fs::read_to_string(&pid_file)?;
if is_process_running(&pid) {
anyhow::bail!("Daemon already running (PID: {})", pid.trim());
}
fs::remove_file(&pid_file)?;
}
let log_file = get_log_file(config.logging.retention_days)?;
println!(
"Starting LocalGPT daemon in background (agent: {})...",
agent_id
);
println!(" PID file: {}", pid_file.display());
println!(" Log file: {}", log_file.display());
if config.server.enabled {
println!(
" Server: http://{}:{}",
config.server.bind, config.server.port
);
}
println!("\nUse 'localgpt daemon status' to check status");
println!("Use 'localgpt daemon stop' to stop\n");
let stdout = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_file)?;
let stderr = stdout.try_clone()?;
let daemonize = Daemonize::new()
.pid_file(&pid_file)
.working_directory(std::env::current_dir()?)
.stdout(stdout)
.stderr(stderr);
match daemonize.start() {
Ok(_) => {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(run_daemon_server(config, agent_id))
}
Err(e) => anyhow::bail!("Failed to daemonize: {}", e),
}
}
async fn run_daemon_server(config: Config, agent_id: &str) -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::new("info"))
.with_ansi(false)
.init();
let memory = MemoryManager::new_with_full_config(&config.memory, Some(&config), agent_id)?;
let _watcher = memory.start_watcher()?;
println!("Daemon started successfully");
run_daemon_services(&config, agent_id).await?;
println!("\nShutting down...");
let pid_file = get_pid_file()?;
fs::remove_file(&pid_file).ok();
Ok(())
}
async fn run_daemon_services(config: &Config, agent_id: &str) -> Result<()> {
let turn_gate = TurnGate::new();
let heartbeat_handle = if config.heartbeat.enabled {
let heartbeat_config = config.clone();
let heartbeat_agent_id = agent_id.to_string();
let heartbeat_gate = turn_gate.clone();
println!(
" Heartbeat: enabled (interval: {})",
config.heartbeat.interval
);
Some(tokio::spawn(async move {
match HeartbeatRunner::new_with_gate(
&heartbeat_config,
&heartbeat_agent_id,
Some(heartbeat_gate),
) {
Ok(runner) => {
if let Err(e) = runner.run().await {
tracing::error!("Heartbeat runner error: {}", e);
}
}
Err(e) => {
tracing::error!("Failed to create heartbeat runner: {}", e);
}
}
}))
} else {
None
};
let telegram_handle = if config.telegram.as_ref().is_some_and(|t| t.enabled) {
let tg_config = config.clone();
let tg_gate = turn_gate.clone();
println!(" Telegram: enabled");
Some(tokio::spawn(async move {
if let Err(e) = localgpt::server::telegram::run_telegram_bot(&tg_config, tg_gate).await
{
tracing::error!("Telegram bot error: {}", e);
}
}))
} else {
None
};
if config.server.enabled {
println!(
" Server: http://{}:{}",
config.server.bind, config.server.port
);
let server = Server::new_with_gate(config, turn_gate)?;
server.run().await?;
} else if heartbeat_handle.is_some() {
println!(" Server: disabled");
tokio::signal::ctrl_c().await?;
} else {
println!(" Neither server nor heartbeat is enabled. Use Ctrl+C to stop.");
tokio::signal::ctrl_c().await?;
}
if let Some(handle) = heartbeat_handle {
handle.abort();
}
if let Some(handle) = telegram_handle {
handle.abort();
}
Ok(())
}
#[derive(Args)]
pub struct DaemonArgs {
#[command(subcommand)]
pub command: DaemonCommands,
}
#[derive(Subcommand)]
pub enum DaemonCommands {
Start {
#[arg(short, long)]
foreground: bool,
},
Stop,
Restart {
#[arg(short, long)]
foreground: bool,
},
Status,
Heartbeat,
}
pub async fn run(args: DaemonArgs, agent_id: &str) -> Result<()> {
match args.command {
DaemonCommands::Start { foreground } => start_daemon(foreground, agent_id).await,
DaemonCommands::Stop => stop_daemon().await,
DaemonCommands::Restart { foreground } => restart_daemon(foreground, agent_id).await,
DaemonCommands::Status => show_status().await,
DaemonCommands::Heartbeat => run_heartbeat_once(agent_id).await,
}
}
async fn start_daemon(foreground: bool, agent_id: &str) -> Result<()> {
let config = Config::load()?;
let pid_file = get_pid_file()?;
if pid_file.exists() {
let pid = fs::read_to_string(&pid_file)?;
if is_process_running(&pid) {
anyhow::bail!("Daemon already running (PID: {})", pid.trim());
}
fs::remove_file(&pid_file)?;
}
#[cfg(unix)]
if !foreground {
anyhow::bail!("Background mode should be handled before Tokio starts");
}
#[cfg(not(unix))]
if !foreground {
println!(
"Note: Background daemonization not supported on this platform. Running in foreground."
);
}
println!(
"Starting LocalGPT daemon in foreground (agent: {})...",
agent_id
);
fs::write(&pid_file, std::process::id().to_string())?;
let memory = MemoryManager::new_with_full_config(&config.memory, Some(&config), agent_id)?;
let _watcher = memory.start_watcher()?;
println!("Daemon started successfully");
run_daemon_services(&config, agent_id).await?;
println!("\nShutting down...");
fs::remove_file(&pid_file).ok();
Ok(())
}
async fn stop_daemon() -> Result<()> {
let pid_file = get_pid_file()?;
if !pid_file.exists() {
println!("Daemon is not running");
return Ok(());
}
let pid = fs::read_to_string(&pid_file)?.trim().to_string();
if !is_process_running(&pid) {
println!("Daemon is not running (stale PID file)");
fs::remove_file(&pid_file)?;
return Ok(());
}
#[cfg(unix)]
{
use std::process::Command;
Command::new("kill").args(["-TERM", &pid]).status()?;
}
#[cfg(windows)]
{
use std::process::Command;
Command::new("taskkill").args(["/PID", &pid]).status()?;
}
println!("Sent stop signal to daemon (PID: {})", pid);
fs::remove_file(&pid_file)?;
Ok(())
}
async fn restart_daemon(foreground: bool, agent_id: &str) -> Result<()> {
let pid_file = get_pid_file()?;
if pid_file.exists() {
let pid = fs::read_to_string(&pid_file)?.trim().to_string();
if is_process_running(&pid) {
println!("Stopping daemon (PID: {})...", pid);
#[cfg(unix)]
{
use std::process::Command;
Command::new("kill").args(["-TERM", &pid]).status()?;
}
#[cfg(windows)]
{
use std::process::Command;
Command::new("taskkill").args(["/PID", &pid]).status()?;
}
for _ in 0..50 {
if !is_process_running(&pid) {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
if is_process_running(&pid) {
anyhow::bail!("Failed to stop daemon (PID: {})", pid);
}
println!("Daemon stopped");
}
fs::remove_file(&pid_file).ok();
}
#[cfg(unix)]
if !foreground {
println!("\nTo start daemon in background, run: localgpt daemon start");
println!("(Background restart requires re-running the command due to fork requirements)");
return Ok(());
}
println!();
start_daemon(foreground, agent_id).await
}
async fn show_status() -> Result<()> {
let config = Config::load()?;
let pid_file = get_pid_file()?;
let running = if pid_file.exists() {
let pid = fs::read_to_string(&pid_file)?;
is_process_running(&pid)
} else {
false
};
println!("LocalGPT Daemon Status");
println!("----------------------");
println!("Running: {}", if running { "yes" } else { "no" });
if running {
let pid = fs::read_to_string(&pid_file)?;
println!("PID: {}", pid.trim());
}
println!("\nConfiguration:");
println!(" Heartbeat enabled: {}", config.heartbeat.enabled);
if config.heartbeat.enabled {
println!(" Heartbeat interval: {}", config.heartbeat.interval);
}
println!(" Server enabled: {}", config.server.enabled);
if config.server.enabled {
println!(
" Server address: http://{}:{}",
config.server.bind, config.server.port
);
}
Ok(())
}
async fn run_heartbeat_once(agent_id: &str) -> Result<()> {
let config = Config::load()?;
let runner = HeartbeatRunner::new_with_agent(&config, agent_id)?;
println!("Running heartbeat (agent: {})...", agent_id);
let result = runner.run_once().await?;
if result == "HEARTBEAT_OK" {
println!("Heartbeat completed: No tasks needed attention");
} else {
println!("Heartbeat response:\n{}", result);
}
Ok(())
}
fn get_pid_file() -> Result<PathBuf> {
let state_dir = localgpt::agent::get_state_dir()?;
Ok(state_dir.join("daemon.pid"))
}
fn get_log_file(retention_days: u32) -> Result<PathBuf> {
let state_dir = localgpt::agent::get_state_dir()?;
let logs_dir = state_dir.join("logs");
fs::create_dir_all(&logs_dir)?;
if retention_days > 0 {
prune_old_logs(&logs_dir, retention_days as i64);
}
let date = chrono::Local::now().format("%Y-%m-%d");
Ok(logs_dir.join(format!("localgpt-{}.log", date)))
}
fn prune_old_logs(logs_dir: &std::path::Path, keep_days: i64) {
let cutoff = chrono::Local::now() - chrono::Duration::days(keep_days);
let cutoff_date = cutoff.format("%Y-%m-%d").to_string();
if let Ok(entries) = fs::read_dir(logs_dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with("localgpt-")
&& name_str.ends_with(".log")
&& let Some(date_part) = name_str
.strip_prefix("localgpt-")
.and_then(|s| s.strip_suffix(".log"))
&& date_part < cutoff_date.as_str()
{
let _ = fs::remove_file(entry.path());
}
}
}
}
fn is_process_running(pid: &str) -> bool {
let pid = pid.trim();
#[cfg(unix)]
{
use std::process::Command;
Command::new("kill")
.args(["-0", pid])
.status()
.map(|s| s.success())
.unwrap_or(false)
}
#[cfg(windows)]
{
use std::process::Command;
Command::new("tasklist")
.args(["/FI", &format!("PID eq {}", pid)])
.output()
.map(|o| String::from_utf8_lossy(&o.stdout).contains(pid))
.unwrap_or(false)
}
}