Skip to main content

batty_cli/team/daemon/
state.rs

1//! Daemon state persistence — save/load/restore runtime state across restarts.
2
3use std::collections::{HashMap, HashSet};
4use std::fs;
5use std::path::PathBuf;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use tracing::warn;
11
12use super::dispatch::DispatchQueueEntry;
13use super::{TeamDaemon, now_unix, standup};
14use crate::team::standup::MemberState;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub(super) struct PersistedNudgeState {
18    pub idle_elapsed_secs: Option<u64>,
19    pub fired_this_idle: bool,
20    pub paused: bool,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub(super) struct PersistedDaemonState {
25    pub clean_shutdown: bool,
26    pub saved_at: u64,
27    pub states: HashMap<String, MemberState>,
28    pub active_tasks: HashMap<String, u32>,
29    pub retry_counts: HashMap<String, u32>,
30    #[serde(default)]
31    pub dispatch_queue: Vec<DispatchQueueEntry>,
32    pub paused_standups: HashSet<String>,
33    pub last_standup_elapsed_secs: HashMap<String, u64>,
34    pub nudge_state: HashMap<String, PersistedNudgeState>,
35    pub pipeline_starvation_fired: bool,
36}
37
38impl TeamDaemon {
39    pub(super) fn restore_runtime_state(&mut self) {
40        let Some(state) = load_daemon_state(&self.config.project_root) else {
41            return;
42        };
43
44        self.states = state.states;
45        self.idle_started_at = self
46            .states
47            .iter()
48            .filter(|(_, state)| matches!(state, MemberState::Idle))
49            .map(|(member, _)| (member.clone(), Instant::now()))
50            .collect();
51        self.active_tasks = state.active_tasks;
52        self.retry_counts = state.retry_counts;
53        self.dispatch_queue = state.dispatch_queue;
54        self.paused_standups = state.paused_standups;
55        self.last_standup = standup::restore_timer_state(state.last_standup_elapsed_secs);
56
57        for (member_name, persisted) in state.nudge_state {
58            let Some(schedule) = self.nudges.get_mut(&member_name) else {
59                continue;
60            };
61            schedule.idle_since = persisted.idle_elapsed_secs.map(|elapsed_secs| {
62                Instant::now()
63                    .checked_sub(Duration::from_secs(elapsed_secs))
64                    .unwrap_or_else(Instant::now)
65            });
66            schedule.fired_this_idle = persisted.fired_this_idle;
67            schedule.paused = persisted.paused;
68        }
69        self.pipeline_starvation_fired = state.pipeline_starvation_fired;
70    }
71
72    pub(super) fn persist_runtime_state(&self, clean_shutdown: bool) -> Result<()> {
73        let state = PersistedDaemonState {
74            clean_shutdown,
75            saved_at: now_unix(),
76            states: self.states.clone(),
77            active_tasks: self.active_tasks.clone(),
78            retry_counts: self.retry_counts.clone(),
79            dispatch_queue: self.dispatch_queue.clone(),
80            paused_standups: self.paused_standups.clone(),
81            last_standup_elapsed_secs: standup::snapshot_timer_state(&self.last_standup),
82            nudge_state: self
83                .nudges
84                .iter()
85                .map(|(member, schedule)| {
86                    (
87                        member.clone(),
88                        PersistedNudgeState {
89                            idle_elapsed_secs: schedule.idle_since.map(|t| t.elapsed().as_secs()),
90                            fired_this_idle: schedule.fired_this_idle,
91                            paused: schedule.paused,
92                        },
93                    )
94                })
95                .collect(),
96            pipeline_starvation_fired: self.pipeline_starvation_fired,
97        };
98        save_daemon_state(&self.config.project_root, &state)
99    }
100}
101
102pub(super) fn daemon_state_path(project_root: &std::path::Path) -> PathBuf {
103    super::super::daemon_state_path(project_root)
104}
105
106pub(super) fn load_daemon_state(project_root: &std::path::Path) -> Option<PersistedDaemonState> {
107    let path = daemon_state_path(project_root);
108    let Ok(content) = fs::read_to_string(&path) else {
109        return None;
110    };
111
112    match serde_json::from_str(&content) {
113        Ok(state) => Some(state),
114        Err(error) => {
115            warn!(path = %path.display(), error = %error, "failed to parse daemon state, ignoring");
116            None
117        }
118    }
119}
120
121pub fn load_dispatch_queue_snapshot(project_root: &std::path::Path) -> Vec<DispatchQueueEntry> {
122    load_daemon_state(project_root)
123        .map(|state| state.dispatch_queue)
124        .unwrap_or_default()
125}
126
127pub(super) fn save_daemon_state(
128    project_root: &std::path::Path,
129    state: &PersistedDaemonState,
130) -> Result<()> {
131    let path = daemon_state_path(project_root);
132    if let Some(parent) = path.parent() {
133        fs::create_dir_all(parent)
134            .with_context(|| format!("failed to create {}", parent.display()))?;
135    }
136    let content =
137        serde_json::to_string_pretty(state).context("failed to serialize daemon state")?;
138    fs::write(&path, content).with_context(|| format!("failed to write {}", path.display()))?;
139    Ok(())
140}