localgpt 0.1.3

A local device focused AI assistant with persistent markdown memory, autonomous heartbeat tasks, and semantic search. Single binary, no runtime dependencies.
Documentation
//! Heartbeat runner for continuous autonomous operation

use anyhow::Result;
use chrono::{Local, NaiveTime};
use std::fs;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::{debug, info, warn};

use super::events::{HeartbeatEvent, HeartbeatStatus, emit_heartbeat_event, now_ms};
use crate::agent::{
    Agent, AgentConfig, HEARTBEAT_OK_TOKEN, SessionStore, build_heartbeat_prompt, is_heartbeat_ok,
};
use crate::concurrency::{TurnGate, WorkspaceLock};
use crate::config::{Config, parse_duration, parse_time};
use crate::memory::MemoryManager;

pub struct HeartbeatRunner {
    config: Config,
    interval: Duration,
    active_hours: Option<(NaiveTime, NaiveTime)>,
    workspace: PathBuf,
    agent_id: String,
    /// Cached MemoryManager to avoid reinitializing embedding provider on every heartbeat
    memory: MemoryManager,
    /// In-process turn gate (shared with HTTP server when running in daemon)
    turn_gate: Option<TurnGate>,
    /// Cross-process workspace lock
    workspace_lock: WorkspaceLock,
}

impl HeartbeatRunner {
    /// Create a new HeartbeatRunner with the default agent ID ("main")
    pub fn new(config: &Config) -> Result<Self> {
        Self::new_with_agent(config, "main")
    }

    /// Create a new HeartbeatRunner for a specific agent ID
    pub fn new_with_agent(config: &Config, agent_id: &str) -> Result<Self> {
        Self::new_with_gate(config, agent_id, None)
    }

    /// Create a new HeartbeatRunner with an optional in-process TurnGate.
    ///
    /// When running inside the daemon alongside the HTTP server, pass a
    /// shared `TurnGate` so heartbeat skips when an HTTP agent turn is active.
    pub fn new_with_gate(
        config: &Config,
        agent_id: &str,
        turn_gate: Option<TurnGate>,
    ) -> Result<Self> {
        let interval = parse_duration(&config.heartbeat.interval)
            .map_err(|e| anyhow::anyhow!("Invalid heartbeat interval: {}", e))?;

        let active_hours = if let Some(ref hours) = config.heartbeat.active_hours {
            let (start_h, start_m) = parse_time(&hours.start)
                .map_err(|e| anyhow::anyhow!("Invalid start time: {}", e))?;
            let (end_h, end_m) =
                parse_time(&hours.end).map_err(|e| anyhow::anyhow!("Invalid end time: {}", e))?;

            Some((
                NaiveTime::from_hms_opt(start_h as u32, start_m as u32, 0).unwrap(),
                NaiveTime::from_hms_opt(end_h as u32, end_m as u32, 0).unwrap(),
            ))
        } else {
            None
        };

        let workspace = config.workspace_path();

        // Create MemoryManager once and reuse it to avoid reinitializing embedding provider
        let memory = MemoryManager::new_with_full_config(&config.memory, Some(config), agent_id)?;
        let workspace_lock = WorkspaceLock::new()?;

        Ok(Self {
            config: config.clone(),
            interval,
            active_hours,
            workspace,
            agent_id: agent_id.to_string(),
            memory,
            turn_gate,
            workspace_lock,
        })
    }

    /// Run the heartbeat loop continuously
    pub async fn run(&self) -> Result<()> {
        info!(
            "Starting heartbeat runner with interval: {:?}",
            self.interval
        );

        loop {
            // Sleep until next interval
            sleep(self.interval).await;

            // Check active hours
            if !self.in_active_hours() {
                debug!("Outside active hours, skipping heartbeat");
                emit_heartbeat_event(HeartbeatEvent {
                    ts: now_ms(),
                    status: HeartbeatStatus::Skipped,
                    duration_ms: 0,
                    preview: None,
                    reason: Some("outside active hours".to_string()),
                });
                continue;
            }

            // Run heartbeat with timing
            let start = Instant::now();
            match self.run_once_internal().await {
                Ok((response, status)) => {
                    let duration_ms = start.elapsed().as_millis() as u64;
                    let preview = if response.len() > 200 {
                        Some(format!("{}...", &response[..200]))
                    } else {
                        Some(response.clone())
                    };

                    emit_heartbeat_event(HeartbeatEvent {
                        ts: now_ms(),
                        status,
                        duration_ms,
                        preview,
                        reason: None,
                    });

                    if is_heartbeat_ok(&response) {
                        debug!("Heartbeat: OK");
                    } else {
                        info!("Heartbeat response: {}", response);
                    }
                }
                Err(e) => {
                    let duration_ms = start.elapsed().as_millis() as u64;
                    emit_heartbeat_event(HeartbeatEvent {
                        ts: now_ms(),
                        status: HeartbeatStatus::Failed,
                        duration_ms,
                        preview: None,
                        reason: Some(e.to_string()),
                    });
                    warn!("Heartbeat error: {}", e);
                }
            }
        }
    }

    /// Run a single heartbeat cycle (public API, emits events)
    pub async fn run_once(&self) -> Result<String> {
        let start = Instant::now();

        match self.run_once_internal().await {
            Ok((response, status)) => {
                let duration_ms = start.elapsed().as_millis() as u64;
                let preview = if response.len() > 200 {
                    Some(format!("{}...", &response[..200]))
                } else {
                    Some(response.clone())
                };

                emit_heartbeat_event(HeartbeatEvent {
                    ts: now_ms(),
                    status,
                    duration_ms,
                    preview,
                    reason: None,
                });

                Ok(response)
            }
            Err(e) => {
                let duration_ms = start.elapsed().as_millis() as u64;
                emit_heartbeat_event(HeartbeatEvent {
                    ts: now_ms(),
                    status: HeartbeatStatus::Failed,
                    duration_ms,
                    preview: None,
                    reason: Some(e.to_string()),
                });
                Err(e)
            }
        }
    }

    /// Internal heartbeat execution (returns response and status)
    async fn run_once_internal(&self) -> Result<(String, HeartbeatStatus)> {
        // Skip if an in-process agent turn is already in flight
        if let Some(ref gate) = self.turn_gate
            && gate.is_busy()
        {
            debug!("Skipping heartbeat: agent turn in flight (TurnGate busy)");
            return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
        }

        // Try to acquire the cross-process workspace lock (non-blocking)
        let _ws_guard = match self.workspace_lock.try_acquire()? {
            Some(guard) => guard,
            None => {
                debug!("Skipping heartbeat: workspace locked by another process");
                return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
            }
        };

        // Try to acquire the in-process turn gate (non-blocking, race between
        // the is_busy check above and now)
        let _gate_permit = if let Some(ref gate) = self.turn_gate {
            match gate.try_acquire() {
                Some(permit) => Some(permit),
                None => {
                    debug!("Skipping heartbeat: agent turn started between check and acquire");
                    return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
                }
            }
        } else {
            None
        };

        // Check if HEARTBEAT.md exists and has content
        let heartbeat_path = self.workspace.join("HEARTBEAT.md");

        if !heartbeat_path.exists() {
            debug!("No HEARTBEAT.md found");
            return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
        }

        let content = fs::read_to_string(&heartbeat_path)?;
        if content.trim().is_empty() {
            debug!("HEARTBEAT.md is empty");
            return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
        }

        // Create agent for heartbeat (clone the cached MemoryManager to share the embedding provider)
        let agent_config = AgentConfig {
            model: self.config.agent.default_model.clone(),
            context_window: self.config.agent.context_window,
            reserve_tokens: self.config.agent.reserve_tokens,
        };

        let mut agent = Agent::new(agent_config, &self.config, self.memory.clone()).await?;
        agent.new_session().await?;

        // Check if workspace is a git repo
        let workspace_is_git = self.workspace.join(".git").exists();

        // Send heartbeat prompt
        let heartbeat_prompt = build_heartbeat_prompt(workspace_is_git);
        let response = agent.chat(&heartbeat_prompt).await?;

        // Determine status based on response
        if is_heartbeat_ok(&response) {
            return Ok((response, HeartbeatStatus::Ok));
        }

        // For actual alerts, check for deduplication
        let session_key = "heartbeat"; // Use dedicated session key for heartbeat state

        // Load session store to check for duplicates
        if let Ok(mut store) = SessionStore::load_for_agent(&self.agent_id) {
            if let Some(entry) = store.get(session_key)
                && entry.is_duplicate_heartbeat(&response)
            {
                debug!(
                    "Skipping duplicate heartbeat (same text within 24h): {}",
                    &response[..response.len().min(100)]
                );
                return Ok((response, HeartbeatStatus::Skipped));
            }

            // Record the heartbeat (re-read from disk to avoid clobbering)
            let session_id = agent.session_status().id;
            if let Err(e) = store.load_and_update(session_key, &session_id, |entry| {
                entry.record_heartbeat(&response);
            }) {
                warn!("Failed to record heartbeat in session store: {}", e);
            }
        }

        Ok((response, HeartbeatStatus::Sent))
    }

    fn in_active_hours(&self) -> bool {
        let Some((start, end)) = self.active_hours else {
            return true; // No active hours configured, always active
        };

        let now = Local::now().time();

        if start <= end {
            // Normal range (e.g., 09:00 to 22:00)
            now >= start && now <= end
        } else {
            // Overnight range (e.g., 22:00 to 06:00)
            now >= start || now <= end
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_active_hours_normal_range() {
        // This test would require mocking Local::now()
        // For now, just verify the logic pattern
        let start = NaiveTime::from_hms_opt(9, 0, 0).unwrap();
        let end = NaiveTime::from_hms_opt(22, 0, 0).unwrap();

        let noon = NaiveTime::from_hms_opt(12, 0, 0).unwrap();
        let midnight = NaiveTime::from_hms_opt(0, 0, 0).unwrap();

        assert!(noon >= start && noon <= end);
        assert!(!(midnight >= start && midnight <= end));
    }
}