batty_cli/team/daemon/
state.rs1use std::collections::{HashMap, HashSet};
4use std::fs;
5use std::path::PathBuf;
6use std::time::{Duration, Instant};
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use tracing::warn;
11
12use super::dispatch::DispatchQueueEntry;
13use super::{TeamDaemon, now_unix, standup};
14use crate::team::standup::MemberState;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub(super) struct PersistedNudgeState {
18 pub idle_elapsed_secs: Option<u64>,
19 pub fired_this_idle: bool,
20 pub paused: bool,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub(super) struct PersistedDaemonState {
25 pub clean_shutdown: bool,
26 pub saved_at: u64,
27 pub states: HashMap<String, MemberState>,
28 pub active_tasks: HashMap<String, u32>,
29 pub retry_counts: HashMap<String, u32>,
30 #[serde(default)]
31 pub discord_event_cursor: usize,
32 #[serde(default)]
33 pub dispatch_queue: Vec<DispatchQueueEntry>,
34 pub paused_standups: HashSet<String>,
35 pub last_standup_elapsed_secs: HashMap<String, u64>,
36 pub nudge_state: HashMap<String, PersistedNudgeState>,
37 pub pipeline_starvation_fired: bool,
38 #[serde(default)]
39 pub optional_subsystem_backoff: HashMap<String, u32>,
40 #[serde(default)]
41 pub optional_subsystem_disabled_remaining_secs: HashMap<String, u64>,
42}
43
44impl TeamDaemon {
45 pub(super) fn restore_runtime_state(&mut self) {
46 let Some(state) = load_daemon_state(&self.config.project_root) else {
47 return;
48 };
49
50 self.states = state.states;
51 self.discord_event_cursor = self.discord_event_cursor.max(state.discord_event_cursor);
52 self.idle_started_at = self
53 .states
54 .iter()
55 .filter(|(_, state)| matches!(state, MemberState::Idle))
56 .map(|(member, _)| (member.clone(), Instant::now()))
57 .collect();
58 self.active_tasks = state.active_tasks;
59 self.retry_counts = state.retry_counts;
60 self.discord_event_cursor = state.discord_event_cursor;
61 self.dispatch_queue = state.dispatch_queue;
62 self.paused_standups = state.paused_standups;
63 self.last_standup = standup::restore_timer_state(state.last_standup_elapsed_secs);
64
65 for (member_name, persisted) in state.nudge_state {
66 let Some(schedule) = self.nudges.get_mut(&member_name) else {
67 continue;
68 };
69 schedule.idle_since = persisted.idle_elapsed_secs.map(|elapsed_secs| {
70 Instant::now()
71 .checked_sub(Duration::from_secs(elapsed_secs))
72 .unwrap_or_else(Instant::now)
73 });
74 schedule.fired_this_idle = persisted.fired_this_idle;
75 schedule.paused = persisted.paused;
76 }
77 self.pipeline_starvation_fired = state.pipeline_starvation_fired;
78 self.restore_optional_subsystem_budget_state(
79 &state.optional_subsystem_backoff,
80 &state.optional_subsystem_disabled_remaining_secs,
81 );
82 }
83
84 pub(super) fn persist_runtime_state(&self, clean_shutdown: bool) -> Result<()> {
85 let optional_subsystem_backoff = self.snapshot_optional_subsystem_backoff();
86 let optional_subsystem_disabled_remaining_secs =
87 self.snapshot_optional_subsystem_disabled_remaining_secs();
88 let state = PersistedDaemonState {
89 clean_shutdown,
90 saved_at: now_unix(),
91 states: self.states.clone(),
92 active_tasks: self.active_tasks.clone(),
93 retry_counts: self.retry_counts.clone(),
94 discord_event_cursor: self.discord_event_cursor,
95 dispatch_queue: self.dispatch_queue.clone(),
96 paused_standups: self.paused_standups.clone(),
97 last_standup_elapsed_secs: standup::snapshot_timer_state(&self.last_standup),
98 nudge_state: self
99 .nudges
100 .iter()
101 .map(|(member, schedule)| {
102 (
103 member.clone(),
104 PersistedNudgeState {
105 idle_elapsed_secs: schedule.idle_since.map(|t| t.elapsed().as_secs()),
106 fired_this_idle: schedule.fired_this_idle,
107 paused: schedule.paused,
108 },
109 )
110 })
111 .collect(),
112 pipeline_starvation_fired: self.pipeline_starvation_fired,
113 optional_subsystem_backoff,
114 optional_subsystem_disabled_remaining_secs,
115 };
116 save_daemon_state(&self.config.project_root, &state)
117 }
118}
119
120pub(super) fn daemon_state_path(project_root: &std::path::Path) -> PathBuf {
121 super::super::daemon_state_path(project_root)
122}
123
124pub(super) fn load_daemon_state(project_root: &std::path::Path) -> Option<PersistedDaemonState> {
125 let path = daemon_state_path(project_root);
126 let Ok(content) = fs::read_to_string(&path) else {
127 return None;
128 };
129
130 match serde_json::from_str(&content) {
131 Ok(state) => Some(state),
132 Err(error) => {
133 warn!(path = %path.display(), error = %error, "failed to parse daemon state, ignoring");
134 None
135 }
136 }
137}
138
139pub fn load_dispatch_queue_snapshot(project_root: &std::path::Path) -> Vec<DispatchQueueEntry> {
140 load_daemon_state(project_root)
141 .map(|state| state.dispatch_queue)
142 .unwrap_or_default()
143}
144
145pub(super) fn save_daemon_state(
146 project_root: &std::path::Path,
147 state: &PersistedDaemonState,
148) -> Result<()> {
149 let path = daemon_state_path(project_root);
150 if let Some(parent) = path.parent() {
151 fs::create_dir_all(parent)
152 .with_context(|| format!("failed to create {}", parent.display()))?;
153 }
154 let content =
155 serde_json::to_string_pretty(state).context("failed to serialize daemon state")?;
156 fs::write(&path, content).with_context(|| format!("failed to write {}", path.display()))?;
157 Ok(())
158}