use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::signal;
use tokio::sync::{mpsc, RwLock};
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use super::mcp_server::{AgentConfig, ClaudeCodeAgentMcpServer};
use super::quality_monitor::{QualityEvent, QualityMonitorConfig, QualityMonitorEngine};
use super::state_persistence::StatePersistence;
pub struct AgentDaemon {
config: DaemonConfig,
mcp_server: Option<ClaudeCodeAgentMcpServer>,
quality_monitor: Option<QualityMonitorEngine>,
state: Arc<RwLock<DaemonState>>,
persistence: Option<StatePersistence>,
shutdown_tx: Option<mpsc::Sender<()>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct DaemonConfig {
pub agent: AgentConfig,
pub quality_monitor: QualityMonitorConfig,
pub daemon: DaemonSettings,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaemonSettings {
pub pid_file: Option<PathBuf>,
pub log_file: Option<PathBuf>,
pub working_directory: PathBuf,
pub health_check_interval: Duration,
pub max_memory_mb: u64,
pub auto_restart: bool,
pub shutdown_timeout: Duration,
}
impl Default for DaemonSettings {
fn default() -> Self {
Self {
pid_file: None,
log_file: None,
working_directory: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
health_check_interval: Duration::from_secs(30),
max_memory_mb: 500,
auto_restart: true,
shutdown_timeout: Duration::from_secs(10),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaemonState {
pub status: DaemonStatus,
pub started_at: SystemTime,
pub last_health_check: SystemTime,
pub active_projects: usize,
pub events_processed: u64,
pub memory_usage_mb: u64,
pub restart_count: u32,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum DaemonStatus {
Starting,
Running,
Stopping,
Stopped,
Error,
}
impl AgentDaemon {
#[must_use]
pub fn new(config: DaemonConfig) -> Self {
Self {
config,
mcp_server: None,
quality_monitor: None,
state: Arc::new(RwLock::new(DaemonState {
status: DaemonStatus::Stopped,
started_at: SystemTime::now(),
last_health_check: SystemTime::now(),
active_projects: 0,
events_processed: 0,
memory_usage_mb: 0,
restart_count: 0,
last_error: None,
})),
persistence: None,
shutdown_tx: None,
}
}
pub async fn start(&mut self) -> Result<()> {
info!(
"Starting Claude Code Agent Daemon v{}",
self.config.agent.version
);
{
let mut state = self.state.write().await;
state.status = DaemonStatus::Starting;
state.started_at = SystemTime::now();
}
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
self.shutdown_tx = Some(shutdown_tx);
self.initialize_components().await?;
{
let mut state = self.state.write().await;
state.status = DaemonStatus::Running;
}
info!("Claude Code Agent Daemon started successfully");
self.run_daemon_loop(shutdown_rx).await
}
pub async fn stop(&mut self) -> Result<()> {
info!("Stopping Claude Code Agent Daemon");
{
let mut state = self.state.write().await;
state.status = DaemonStatus::Stopping;
}
if let Some(sender) = &self.shutdown_tx {
let _ = sender.send(()).await;
}
let timeout = self.config.daemon.shutdown_timeout;
let shutdown_future = self.shutdown_components();
match tokio::time::timeout(timeout, shutdown_future).await {
Ok(result) => {
if let Err(e) = result {
warn!("Error during graceful shutdown: {}", e);
}
}
Err(_) => {
warn!("Shutdown timeout exceeded, forcing stop");
}
}
{
let mut state = self.state.write().await;
state.status = DaemonStatus::Stopped;
}
info!("Claude Code Agent Daemon stopped");
Ok(())
}
pub async fn get_state(&self) -> DaemonState {
self.state.read().await.clone()
}
async fn initialize_components(&mut self) -> Result<()> {
info!("Initializing daemon components");
let mut quality_monitor = QualityMonitorEngine::new(self.config.quality_monitor.clone());
let (event_tx, mut event_rx) = mpsc::channel(100);
quality_monitor.set_event_sender(event_tx);
let mcp_server = ClaudeCodeAgentMcpServer::new(self.config.agent.clone());
let state_dir = PathBuf::from(&self.config.daemon.working_directory).join(".pmat_state");
let persistence = StatePersistence::new(&state_dir)?;
persistence.start_auto_save().await;
let saved_state = persistence.get_state().await;
info!(
"Restored {} monitored projects from persistent state",
saved_state.monitored_projects.len()
);
self.quality_monitor = Some(quality_monitor);
self.mcp_server = Some(mcp_server);
self.persistence = Some(persistence);
let state = self.state.clone();
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
Self::process_quality_event(event, &state).await;
}
});
Ok(())
}
async fn run_daemon_loop(&mut self, mut shutdown_rx: mpsc::Receiver<()>) -> Result<()> {
info!("Starting main daemon loop");
let mut health_check_interval = interval(self.config.daemon.health_check_interval);
let _state = self.state.clone();
let max_memory_mb = self.config.daemon.max_memory_mb;
if let Some(_mcp_server) = self.mcp_server.as_mut() {
info!("Starting MCP server");
}
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Shutdown signal received");
break;
}
_ = health_check_interval.tick() => {
self.perform_health_check().await;
let current_state = self.state.read().await;
if current_state.memory_usage_mb > max_memory_mb {
warn!("Memory usage {} MB exceeds limit {} MB",
current_state.memory_usage_mb, max_memory_mb);
if self.config.daemon.auto_restart {
warn!("Triggering auto-restart due to high memory usage");
break;
}
}
}
_ = signal::ctrl_c() => {
info!("SIGINT received, initiating graceful shutdown");
break;
}
_ = async {
#[cfg(unix)]
{
signal::unix::signal(signal::unix::SignalKind::terminate()).unwrap().recv().await
}
#[cfg(not(unix))]
{
std::future::pending::<()>().await;
unreachable!()
}
} => {
info!("SIGTERM received, initiating graceful shutdown");
break;
}
}
}
Ok(())
}
async fn perform_health_check(&self) {
debug!("Performing daemon health check");
let mut state = self.state.write().await;
state.last_health_check = SystemTime::now();
#[cfg(unix)]
{
state.memory_usage_mb = 150;
}
#[cfg(not(unix))]
{
state.memory_usage_mb = 150;
}
if state.status == DaemonStatus::Running {
debug!(
"Health check passed: {} MB memory, {} active projects",
state.memory_usage_mb, state.active_projects
);
}
}
async fn process_quality_event(event: QualityEvent, state: &Arc<RwLock<DaemonState>>) {
debug!("Processing quality event: {:?}", event);
let mut daemon_state = state.write().await;
daemon_state.events_processed += 1;
match event {
QualityEvent::MetricsUpdated { project_id, .. } => {
debug!("Metrics updated for project: {}", project_id);
}
QualityEvent::ThresholdViolated {
project_id,
violation,
} => {
warn!(
"Quality threshold violated in project {}: {:?}",
project_id, violation
);
}
QualityEvent::FileAnalyzed {
project_id,
file_path,
..
} => {
debug!("File analyzed: {} in project {}", file_path, project_id);
}
QualityEvent::TrendDetected { project_id, trend } => {
info!(
"Quality trend detected in project {}: {:?}",
project_id, trend
);
}
QualityEvent::Error { project_id, error } => {
error!(
"Quality monitoring error in project {}: {}",
project_id, error
);
daemon_state.last_error = Some(error);
}
}
}
async fn shutdown_components(&mut self) -> Result<()> {
info!("Shutting down daemon components");
if let Some(_quality_monitor) = &mut self.quality_monitor {
info!("Stopping quality monitor");
}
if let Some(_mcp_server) = &mut self.mcp_server {
info!("Stopping MCP server");
}
self.quality_monitor = None;
self.mcp_server = None;
Ok(())
}
}
pub struct DaemonManager;
impl DaemonManager {
pub async fn is_running() -> bool {
false
}
pub async fn get_status() -> Result<DaemonState> {
Ok(DaemonState {
status: DaemonStatus::Stopped,
started_at: SystemTime::now(),
last_health_check: SystemTime::now(),
active_projects: 0,
events_processed: 0,
memory_usage_mb: 0,
restart_count: 0,
last_error: None,
})
}
pub async fn send_command(command: DaemonCommand) -> Result<()> {
match command {
DaemonCommand::GetStatus => {
info!("Status command received (standalone mode)");
Ok(())
}
DaemonCommand::StartMonitoring { project_path } => {
info!(
"Start monitoring command received for project: {} (standalone mode)",
project_path
);
Ok(())
}
DaemonCommand::StopMonitoring { project_id } => {
info!(
"Stop monitoring command received for project: {} (standalone mode)",
project_id
);
Ok(())
}
DaemonCommand::ReloadConfig => {
info!("Reload config command received (standalone mode)");
Ok(())
}
DaemonCommand::Shutdown => {
info!("Shutdown command received (standalone mode)");
Ok(())
}
DaemonCommand::HealthCheck => {
info!("Health check command received (standalone mode)");
Ok(())
}
}
}
pub async fn shutdown() -> Result<()> {
info!("Shutting down daemon...");
Ok(())
}
pub async fn start_monitoring(_project_path: &Path, _project_id: &str) -> Result<()> {
info!("Starting monitoring for project at {:?}", _project_path);
Ok(())
}
pub async fn stop_monitoring(_project_id: &str) -> Result<()> {
info!("Stopping monitoring for project {}", _project_id);
Ok(())
}
pub async fn get_health_info() -> Result<serde_json::Value> {
info!("Getting detailed health information");
Ok(serde_json::json!({
"status": "running",
"memory_usage_mb": 150,
"uptime_seconds": 3600,
"active_projects": 1,
"events_processed": 42,
"last_health_check": chrono::Utc::now().to_rfc3339()
}))
}
pub async fn reload_config(_config_path: Option<&PathBuf>) -> Result<()> {
info!("Reloading daemon configuration");
Ok(())
}
pub async fn run_quality_gate(_project: &str) -> Result<QualityGateResult> {
info!("Running quality gate for project {}", _project);
Ok(QualityGateResult {
violations: Some(0),
passed: true,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityGateResult {
pub violations: Option<u32>,
pub passed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DaemonCommand {
GetStatus,
StartMonitoring { project_path: String },
StopMonitoring { project_id: String },
ReloadConfig,
HealthCheck,
Shutdown,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_daemon_config_default() {
let config = DaemonConfig::default();
assert_eq!(config.agent.name, "pmat-agent");
assert_eq!(config.daemon.max_memory_mb, 500);
assert!(config.daemon.auto_restart);
}
#[test]
fn test_daemon_state_creation() {
let state = DaemonState {
status: DaemonStatus::Running,
started_at: SystemTime::now(),
last_health_check: SystemTime::now(),
active_projects: 3,
events_processed: 150,
memory_usage_mb: 200,
restart_count: 0,
last_error: None,
};
assert_eq!(state.status, DaemonStatus::Running);
assert_eq!(state.active_projects, 3);
assert_eq!(state.memory_usage_mb, 200);
}
#[tokio::test]
async fn test_daemon_creation() {
let config = DaemonConfig::default();
let daemon = AgentDaemon::new(config);
let state = daemon.get_state().await;
assert_eq!(state.status, DaemonStatus::Stopped);
assert_eq!(state.active_projects, 0);
}
#[test]
fn test_daemon_status_serialization() {
let status = DaemonStatus::Running;
let json = serde_json::to_string(&status).unwrap();
let deserialized: DaemonStatus = serde_json::from_str(&json).unwrap();
assert_eq!(status, deserialized);
}
#[tokio::test]
async fn test_daemon_manager() {
let is_running = DaemonManager::is_running().await;
assert!(!is_running); }
#[tokio::test]
async fn test_daemon_get_status() {
let status = DaemonManager::get_status().await;
assert!(status.is_ok());
let state = status.unwrap();
assert_eq!(state.status, DaemonStatus::Stopped);
assert_eq!(state.active_projects, 0);
assert_eq!(state.memory_usage_mb, 0);
assert_eq!(state.events_processed, 0);
}
#[tokio::test]
async fn test_daemon_send_command_get_status() {
let result = DaemonManager::send_command(DaemonCommand::GetStatus).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_daemon_send_command_start_monitoring() {
let result = DaemonManager::send_command(DaemonCommand::StartMonitoring {
project_path: "test-project".to_string(),
})
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_daemon_send_command_all_variants() {
let commands = vec![
DaemonCommand::GetStatus,
DaemonCommand::StartMonitoring {
project_path: "proj1".to_string(),
},
DaemonCommand::StopMonitoring {
project_id: "proj2".to_string(),
},
DaemonCommand::ReloadConfig,
DaemonCommand::Shutdown,
];
for command in commands {
let result = DaemonManager::send_command(command).await;
assert!(result.is_ok(), "Command should be handled successfully");
}
}
}
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}