use anyhow::Result;
use chrono::{Local, NaiveTime};
use std::fs;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use super::events::{HeartbeatEvent, HeartbeatStatus, emit_heartbeat_event, now_ms};
use crate::agent::{
Agent, AgentConfig, HEARTBEAT_OK_TOKEN, SessionStore, build_heartbeat_prompt, is_heartbeat_ok,
};
use crate::concurrency::{TurnGate, WorkspaceLock};
use crate::config::{Config, parse_duration, parse_time};
use crate::memory::MemoryManager;
pub struct HeartbeatRunner {
config: Config,
interval: Duration,
active_hours: Option<(NaiveTime, NaiveTime)>,
workspace: PathBuf,
agent_id: String,
memory: MemoryManager,
turn_gate: Option<TurnGate>,
workspace_lock: WorkspaceLock,
}
impl HeartbeatRunner {
pub fn new(config: &Config) -> Result<Self> {
Self::new_with_agent(config, "main")
}
pub fn new_with_agent(config: &Config, agent_id: &str) -> Result<Self> {
Self::new_with_gate(config, agent_id, None)
}
pub fn new_with_gate(
config: &Config,
agent_id: &str,
turn_gate: Option<TurnGate>,
) -> Result<Self> {
let interval = parse_duration(&config.heartbeat.interval)
.map_err(|e| anyhow::anyhow!("Invalid heartbeat interval: {}", e))?;
let active_hours = if let Some(ref hours) = config.heartbeat.active_hours {
let (start_h, start_m) = parse_time(&hours.start)
.map_err(|e| anyhow::anyhow!("Invalid start time: {}", e))?;
let (end_h, end_m) =
parse_time(&hours.end).map_err(|e| anyhow::anyhow!("Invalid end time: {}", e))?;
Some((
NaiveTime::from_hms_opt(start_h as u32, start_m as u32, 0).unwrap(),
NaiveTime::from_hms_opt(end_h as u32, end_m as u32, 0).unwrap(),
))
} else {
None
};
let workspace = config.workspace_path();
let memory = MemoryManager::new_with_full_config(&config.memory, Some(config), agent_id)?;
let workspace_lock = WorkspaceLock::new()?;
Ok(Self {
config: config.clone(),
interval,
active_hours,
workspace,
agent_id: agent_id.to_string(),
memory,
turn_gate,
workspace_lock,
})
}
pub async fn run(&self) -> Result<()> {
info!(
"Starting heartbeat runner with interval: {:?}",
self.interval
);
loop {
sleep(self.interval).await;
if !self.in_active_hours() {
debug!("Outside active hours, skipping heartbeat");
emit_heartbeat_event(HeartbeatEvent {
ts: now_ms(),
status: HeartbeatStatus::Skipped,
duration_ms: 0,
preview: None,
reason: Some("outside active hours".to_string()),
});
continue;
}
let start = Instant::now();
match self.run_once_internal().await {
Ok((response, status)) => {
let duration_ms = start.elapsed().as_millis() as u64;
let preview = if response.len() > 200 {
Some(format!("{}...", &response[..200]))
} else {
Some(response.clone())
};
emit_heartbeat_event(HeartbeatEvent {
ts: now_ms(),
status,
duration_ms,
preview,
reason: None,
});
if is_heartbeat_ok(&response) {
debug!("Heartbeat: OK");
} else {
info!("Heartbeat response: {}", response);
}
}
Err(e) => {
let duration_ms = start.elapsed().as_millis() as u64;
emit_heartbeat_event(HeartbeatEvent {
ts: now_ms(),
status: HeartbeatStatus::Failed,
duration_ms,
preview: None,
reason: Some(e.to_string()),
});
warn!("Heartbeat error: {}", e);
}
}
}
}
pub async fn run_once(&self) -> Result<String> {
let start = Instant::now();
match self.run_once_internal().await {
Ok((response, status)) => {
let duration_ms = start.elapsed().as_millis() as u64;
let preview = if response.len() > 200 {
Some(format!("{}...", &response[..200]))
} else {
Some(response.clone())
};
emit_heartbeat_event(HeartbeatEvent {
ts: now_ms(),
status,
duration_ms,
preview,
reason: None,
});
Ok(response)
}
Err(e) => {
let duration_ms = start.elapsed().as_millis() as u64;
emit_heartbeat_event(HeartbeatEvent {
ts: now_ms(),
status: HeartbeatStatus::Failed,
duration_ms,
preview: None,
reason: Some(e.to_string()),
});
Err(e)
}
}
}
async fn run_once_internal(&self) -> Result<(String, HeartbeatStatus)> {
if let Some(ref gate) = self.turn_gate
&& gate.is_busy()
{
debug!("Skipping heartbeat: agent turn in flight (TurnGate busy)");
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
}
let _ws_guard = match self.workspace_lock.try_acquire()? {
Some(guard) => guard,
None => {
debug!("Skipping heartbeat: workspace locked by another process");
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
}
};
let _gate_permit = if let Some(ref gate) = self.turn_gate {
match gate.try_acquire() {
Some(permit) => Some(permit),
None => {
debug!("Skipping heartbeat: agent turn started between check and acquire");
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
}
}
} else {
None
};
let heartbeat_path = self.workspace.join("HEARTBEAT.md");
if !heartbeat_path.exists() {
debug!("No HEARTBEAT.md found");
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
}
let content = fs::read_to_string(&heartbeat_path)?;
if content.trim().is_empty() {
debug!("HEARTBEAT.md is empty");
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
}
let agent_config = AgentConfig {
model: self.config.agent.default_model.clone(),
context_window: self.config.agent.context_window,
reserve_tokens: self.config.agent.reserve_tokens,
};
let mut agent = Agent::new(agent_config, &self.config, self.memory.clone()).await?;
agent.new_session().await?;
let workspace_is_git = self.workspace.join(".git").exists();
let heartbeat_prompt = build_heartbeat_prompt(workspace_is_git);
let response = agent.chat(&heartbeat_prompt).await?;
if is_heartbeat_ok(&response) {
return Ok((response, HeartbeatStatus::Ok));
}
let session_key = "heartbeat";
if let Ok(mut store) = SessionStore::load_for_agent(&self.agent_id) {
if let Some(entry) = store.get(session_key)
&& entry.is_duplicate_heartbeat(&response)
{
debug!(
"Skipping duplicate heartbeat (same text within 24h): {}",
&response[..response.len().min(100)]
);
return Ok((response, HeartbeatStatus::Skipped));
}
let session_id = agent.session_status().id;
if let Err(e) = store.load_and_update(session_key, &session_id, |entry| {
entry.record_heartbeat(&response);
}) {
warn!("Failed to record heartbeat in session store: {}", e);
}
}
Ok((response, HeartbeatStatus::Sent))
}
fn in_active_hours(&self) -> bool {
let Some((start, end)) = self.active_hours else {
return true; };
let now = Local::now().time();
if start <= end {
now >= start && now <= end
} else {
now >= start || now <= end
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_active_hours_normal_range() {
let start = NaiveTime::from_hms_opt(9, 0, 0).unwrap();
let end = NaiveTime::from_hms_opt(22, 0, 0).unwrap();
let noon = NaiveTime::from_hms_opt(12, 0, 0).unwrap();
let midnight = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
assert!(noon >= start && noon <= end);
assert!(!(midnight >= start && midnight <= end));
}
}