Skip to main content

batty_cli/team/daemon/
state.rs

1//! Daemon state persistence — save/load/restore runtime state across restarts.
2
3use std::collections::{HashMap, HashSet};
4use std::fs;
5use std::path::PathBuf;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use tracing::warn;
11
12use super::dispatch::DispatchQueueEntry;
13use super::{TeamDaemon, now_unix, standup};
14use crate::team::standup::MemberState;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub(super) struct PersistedNudgeState {
18    pub idle_elapsed_secs: Option<u64>,
19    pub fired_this_idle: bool,
20    pub paused: bool,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub(super) struct PersistedDaemonState {
25    pub clean_shutdown: bool,
26    pub saved_at: u64,
27    pub states: HashMap<String, MemberState>,
28    pub active_tasks: HashMap<String, u32>,
29    pub retry_counts: HashMap<String, u32>,
30    #[serde(default)]
31    pub discord_event_cursor: usize,
32    #[serde(default)]
33    pub dispatch_queue: Vec<DispatchQueueEntry>,
34    pub paused_standups: HashSet<String>,
35    pub last_standup_elapsed_secs: HashMap<String, u64>,
36    pub nudge_state: HashMap<String, PersistedNudgeState>,
37    pub pipeline_starvation_fired: bool,
38    #[serde(default)]
39    pub optional_subsystem_backoff: HashMap<String, u32>,
40    #[serde(default)]
41    pub optional_subsystem_disabled_remaining_secs: HashMap<String, u64>,
42}
43
44impl TeamDaemon {
45    pub(super) fn restore_runtime_state(&mut self) {
46        let Some(state) = load_daemon_state(&self.config.project_root) else {
47            return;
48        };
49
50        self.states = state.states;
51        self.discord_event_cursor = self.discord_event_cursor.max(state.discord_event_cursor);
52        self.idle_started_at = self
53            .states
54            .iter()
55            .filter(|(_, state)| matches!(state, MemberState::Idle))
56            .map(|(member, _)| (member.clone(), Instant::now()))
57            .collect();
58        self.active_tasks = state.active_tasks;
59        self.retry_counts = state.retry_counts;
60        self.discord_event_cursor = state.discord_event_cursor;
61        self.dispatch_queue = state.dispatch_queue;
62        self.paused_standups = state.paused_standups;
63        self.last_standup = standup::restore_timer_state(state.last_standup_elapsed_secs);
64
65        for (member_name, persisted) in state.nudge_state {
66            let Some(schedule) = self.nudges.get_mut(&member_name) else {
67                continue;
68            };
69            schedule.idle_since = persisted.idle_elapsed_secs.map(|elapsed_secs| {
70                Instant::now()
71                    .checked_sub(Duration::from_secs(elapsed_secs))
72                    .unwrap_or_else(Instant::now)
73            });
74            schedule.fired_this_idle = persisted.fired_this_idle;
75            schedule.paused = persisted.paused;
76        }
77        self.pipeline_starvation_fired = state.pipeline_starvation_fired;
78        self.restore_optional_subsystem_budget_state(
79            &state.optional_subsystem_backoff,
80            &state.optional_subsystem_disabled_remaining_secs,
81        );
82    }
83
84    pub(super) fn persist_runtime_state(&self, clean_shutdown: bool) -> Result<()> {
85        let optional_subsystem_backoff = self.snapshot_optional_subsystem_backoff();
86        let optional_subsystem_disabled_remaining_secs =
87            self.snapshot_optional_subsystem_disabled_remaining_secs();
88        let state = PersistedDaemonState {
89            clean_shutdown,
90            saved_at: now_unix(),
91            states: self.states.clone(),
92            active_tasks: self.active_tasks.clone(),
93            retry_counts: self.retry_counts.clone(),
94            discord_event_cursor: self.discord_event_cursor,
95            dispatch_queue: self.dispatch_queue.clone(),
96            paused_standups: self.paused_standups.clone(),
97            last_standup_elapsed_secs: standup::snapshot_timer_state(&self.last_standup),
98            nudge_state: self
99                .nudges
100                .iter()
101                .map(|(member, schedule)| {
102                    (
103                        member.clone(),
104                        PersistedNudgeState {
105                            idle_elapsed_secs: schedule.idle_since.map(|t| t.elapsed().as_secs()),
106                            fired_this_idle: schedule.fired_this_idle,
107                            paused: schedule.paused,
108                        },
109                    )
110                })
111                .collect(),
112            pipeline_starvation_fired: self.pipeline_starvation_fired,
113            optional_subsystem_backoff,
114            optional_subsystem_disabled_remaining_secs,
115        };
116        save_daemon_state(&self.config.project_root, &state)
117    }
118}
119
120pub(super) fn daemon_state_path(project_root: &std::path::Path) -> PathBuf {
121    super::super::daemon_state_path(project_root)
122}
123
124pub(super) fn load_daemon_state(project_root: &std::path::Path) -> Option<PersistedDaemonState> {
125    let path = daemon_state_path(project_root);
126    let Ok(content) = fs::read_to_string(&path) else {
127        return None;
128    };
129
130    match serde_json::from_str(&content) {
131        Ok(state) => Some(state),
132        Err(error) => {
133            warn!(path = %path.display(), error = %error, "failed to parse daemon state, ignoring");
134            None
135        }
136    }
137}
138
139pub fn load_dispatch_queue_snapshot(project_root: &std::path::Path) -> Vec<DispatchQueueEntry> {
140    load_daemon_state(project_root)
141        .map(|state| state.dispatch_queue)
142        .unwrap_or_default()
143}
144
145pub(super) fn save_daemon_state(
146    project_root: &std::path::Path,
147    state: &PersistedDaemonState,
148) -> Result<()> {
149    let path = daemon_state_path(project_root);
150    if let Some(parent) = path.parent() {
151        fs::create_dir_all(parent)
152            .with_context(|| format!("failed to create {}", parent.display()))?;
153    }
154    let content =
155        serde_json::to_string_pretty(state).context("failed to serialize daemon state")?;
156    fs::write(&path, content).with_context(|| format!("failed to write {}", path.display()))?;
157    Ok(())
158}