use std::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::runtime::db::{error::DbError, repo::goal::GoalRepo, DbHandle};
use crate::runtime::goal::state::goals_db_path;
const GOAL_HEARTBEAT_INTERVAL_SECS: u64 = 10;
const GOAL_STALE_THRESHOLD_SECS: i64 = 30;
pub async fn claim_goal(goal_id: &str) -> anyhow::Result<GoalHeartbeatHandle> {
let db = DbHandle::open(goals_db_path()).await?;
let pid = std::process::id() as i32;
db.goal_repo()
.update_controller_pid(goal_id, Some(pid))
.await?;
let cancel = CancellationToken::new();
let task = tokio::spawn(heartbeat_loop(goal_id.to_string(), cancel.clone()));
Ok(GoalHeartbeatHandle {
_task: task,
cancel,
})
}
pub async fn release_goal(goal_id: &str) -> anyhow::Result<()> {
let db = DbHandle::open(goals_db_path()).await?;
db.goal_repo()
.update_controller_pid(goal_id, None)
.await
.map_err(db_to_anyhow)?;
Ok(())
}
pub async fn list_orphaned_goals() -> anyhow::Result<Vec<OrphanedGoal>> {
let db = DbHandle::open(goals_db_path()).await?;
let running = db.goal_repo().list_running().await?;
let mut orphaned = Vec::new();
for summary in running {
let record = match db.goal_repo().get(&summary.goal_id).await? {
Some(r) => r,
None => continue,
};
let stale = match record.controller_pid {
Some(pid) if pid > 0 => !is_pid_alive(pid),
_ => true,
};
let heartbeat_stale =
chrono::Utc::now().timestamp() - record.updated_at > GOAL_STALE_THRESHOLD_SECS;
if stale || heartbeat_stale {
orphaned.push(OrphanedGoal {
goal_id: summary.goal_id,
controller_pid: record.controller_pid,
last_updated_at: record.updated_at,
});
}
}
Ok(orphaned)
}
#[derive(Debug, Clone)]
pub struct OrphanedGoal {
pub goal_id: String,
pub controller_pid: Option<i32>,
pub last_updated_at: i64,
}
#[derive(Debug)]
pub struct GoalHeartbeatHandle {
_task: tokio::task::JoinHandle<()>,
cancel: CancellationToken,
}
impl GoalHeartbeatHandle {
pub fn stop(self) {
self.cancel.cancel();
}
}
async fn heartbeat_loop(goal_id: String, cancel: CancellationToken) {
let mut interval = tokio::time::interval(Duration::from_secs(GOAL_HEARTBEAT_INTERVAL_SECS));
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = send_heartbeat(&goal_id).await {
tracing::warn!(goal_id, error = %e, "Goal heartbeat failed");
}
}
_ = cancel.cancelled() => break,
}
}
tracing::debug!(goal_id, "Goal heartbeat loop stopped");
}
async fn send_heartbeat(goal_id: &str) -> Result<(), DbError> {
let db = DbHandle::open(goals_db_path()).await?;
db.goal_repo().heartbeat(goal_id).await
}
#[cfg(unix)]
fn is_pid_alive(pid: i32) -> bool {
unsafe { libc::kill(pid, 0) == 0 }
}
#[cfg(not(unix))]
fn is_pid_alive(pid: i32) -> bool {
true
}
fn db_to_anyhow(e: DbError) -> anyhow::Error {
anyhow::anyhow!("db error: {e}")
}
pub fn test_goal_record(
goal_id: &str,
status: &str,
pid: Option<i32>,
) -> crate::runtime::db::types::GoalRecord {
crate::runtime::db::types::GoalRecord {
goal_id: goal_id.to_string(),
status: status.to_string(),
phase: "execution".to_string(),
kind: None,
original_goal: "test".to_string(),
normalized_goal: "test".to_string(),
goal_text: "test".to_string(),
project_dir: "/tmp".to_string(),
state_dir: "/tmp".to_string(),
policy: "local".to_string(),
delivery_policy: "local".to_string(),
merge_policy: "disabled".to_string(),
until_ready: false,
slice_execution: false,
max_agents: None,
budget_time: None,
budget_tokens: None,
budget_usd: None,
cost_tracker_path: None,
terminal_criteria: None,
failure: None,
created_at: chrono::Utc::now().timestamp(),
updated_at: chrono::Utc::now().timestamp(),
completed_at: None,
controller_pid: pid,
version: 1,
}
}
pub async fn claim_goal_for_test(
goal_id: &str,
pid: i32,
db_path: &std::path::Path,
) -> anyhow::Result<GoalHeartbeatHandle> {
let db = DbHandle::open(db_path).await?;
db.goal_repo()
.update_controller_pid(goal_id, Some(pid))
.await?;
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let goal_id = goal_id.to_string();
let db_path = db_path.to_path_buf();
let task = tokio::spawn(async move {
heartbeat_loop_with_path(goal_id, cancel_clone, db_path).await;
});
Ok(GoalHeartbeatHandle {
_task: task,
cancel,
})
}
async fn heartbeat_loop_with_path(
goal_id: String,
cancel: CancellationToken,
db_path: std::path::PathBuf,
) {
let mut interval = tokio::time::interval(Duration::from_secs(GOAL_HEARTBEAT_INTERVAL_SECS));
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = send_heartbeat_to_path(&goal_id, &db_path).await {
tracing::warn!(goal_id, error = %e, "Goal heartbeat failed");
}
}
_ = cancel.cancelled() => break,
}
}
tracing::debug!(goal_id, "Goal heartbeat loop stopped");
}
async fn send_heartbeat_to_path(goal_id: &str, db_path: &std::path::Path) -> Result<(), DbError> {
let db = DbHandle::open(db_path).await?;
db.goal_repo().heartbeat(goal_id).await
}
pub async fn release_goal_for_test(goal_id: &str, db_path: &std::path::Path) -> anyhow::Result<()> {
let db = DbHandle::open(db_path).await?;
db.goal_repo()
.update_controller_pid(goal_id, None)
.await
.map_err(db_to_anyhow)?;
Ok(())
}
pub async fn list_orphaned_goals_for_test(db: &DbHandle) -> anyhow::Result<Vec<OrphanedGoal>> {
let running = db.goal_repo().list_running().await?;
let mut orphaned = Vec::new();
for summary in running {
let record = match db.goal_repo().get(&summary.goal_id).await? {
Some(r) => r,
None => continue,
};
let stale = match record.controller_pid {
Some(pid) if pid > 0 => !is_pid_alive(pid),
_ => true,
};
let heartbeat_stale =
chrono::Utc::now().timestamp() - record.updated_at > GOAL_STALE_THRESHOLD_SECS;
if stale || heartbeat_stale {
orphaned.push(OrphanedGoal {
goal_id: summary.goal_id,
controller_pid: record.controller_pid,
last_updated_at: record.updated_at,
});
}
}
Ok(orphaned)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::db::repo::goal::GoalRepo;
#[tokio::test]
async fn controller_pid_roundtrip() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("test.db");
let db = DbHandle::open(&db_path).await.unwrap();
let goal = test_goal_record("goal-pid", "running", None);
db.goal_repo().create(&goal).await.unwrap();
db.goal_repo()
.update_controller_pid("goal-pid", Some(42))
.await
.unwrap();
let record = db.goal_repo().get("goal-pid").await.unwrap().unwrap();
assert_eq!(record.controller_pid, Some(42));
db.goal_repo()
.update_controller_pid("goal-pid", None)
.await
.unwrap();
let record = db.goal_repo().get("goal-pid").await.unwrap().unwrap();
assert_eq!(record.controller_pid, None);
}
#[tokio::test]
async fn heartbeat_updates_timestamp() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("test.db");
let db = DbHandle::open(&db_path).await.unwrap();
let mut goal = test_goal_record("goal-hb", "running", None);
goal.updated_at = chrono::Utc::now().timestamp() - 2;
db.goal_repo().create(&goal).await.unwrap();
db.goal_repo().heartbeat("goal-hb").await.unwrap();
let after = db
.goal_repo()
.get("goal-hb")
.await
.unwrap()
.unwrap()
.updated_at;
assert!(
after >= chrono::Utc::now().timestamp() - 1,
"heartbeat should bump updated_at to near-current time"
);
}
#[tokio::test]
#[cfg(unix)]
async fn list_orphaned_detects_dead_pid() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("test.db");
let db = DbHandle::open(&db_path).await.unwrap();
let my_pid = std::process::id() as i32;
let alive = test_goal_record("goal-alive", "running", Some(my_pid));
db.goal_repo().create(&alive).await.unwrap();
let dead = test_goal_record("goal-dead", "running", Some(999_999));
db.goal_repo().create(&dead).await.unwrap();
let orphaned = list_orphaned_goals_for_test(&db).await.unwrap();
let ids: Vec<_> = orphaned.iter().map(|o| o.goal_id.as_str()).collect();
assert!(!ids.contains(&"goal-alive"), "own PID should be alive");
assert!(ids.contains(&"goal-dead"), "PID 999999 should be dead");
}
#[tokio::test]
async fn list_orphaned_detects_stale_heartbeat() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("test.db");
let db = DbHandle::open(&db_path).await.unwrap();
let mut stale = test_goal_record("goal-stale", "running", Some(1));
stale.updated_at = chrono::Utc::now().timestamp() - 3600; db.goal_repo().create(&stale).await.unwrap();
let orphaned = list_orphaned_goals_for_test(&db).await.unwrap();
let ids: Vec<_> = orphaned.iter().map(|o| o.goal_id.as_str()).collect();
assert!(ids.contains(&"goal-stale"));
}
}