1use crate::config::StatesConfig;
4use crate::db::{now_ms, Database};
5use crate::types::TaskStateEvent;
6use anyhow::Result;
7use rusqlite::{params, Connection};
8
9pub(crate) fn record_state_transition(
13 conn: &Connection,
14 task_id: &str,
15 to_status: &str,
16 worker_id: Option<&str>,
17 reason: Option<&str>,
18 states_config: &StatesConfig,
19) -> Result<i64> {
20 let now = now_ms();
21 let mut elapsed_added = 0i64;
22
23 let open_transition: Option<(i64, String, i64)> = conn
25 .query_row(
26 "SELECT id, event, timestamp FROM task_state_sequence
27 WHERE task_id = ?1 AND end_timestamp IS NULL
28 ORDER BY id DESC LIMIT 1",
29 params![task_id],
30 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
31 )
32 .ok();
33
34 if let Some((open_id, prev_event_str, start_timestamp)) = open_transition {
35 conn.execute(
37 "UPDATE task_state_sequence SET end_timestamp = ?1 WHERE id = ?2",
38 params![now, open_id],
39 )?;
40
41 if states_config.is_timed_state(&prev_event_str) {
43 elapsed_added = now - start_timestamp;
44
45 conn.execute(
47 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
48 WHERE id = ?3",
49 params![elapsed_added, now, task_id],
50 )?;
51 }
52 }
53
54 conn.execute(
56 "INSERT INTO task_state_sequence (task_id, worker_id, event, reason, timestamp)
57 VALUES (?1, ?2, ?3, ?4, ?5)",
58 params![task_id, worker_id, to_status, reason, now],
59 )?;
60
61 Ok(elapsed_added)
62}
63
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
66pub struct ProjectStateStats {
67 pub total_transitions: i64,
68 pub total_time_ms: i64,
69 pub tasks_affected: i64,
70 pub transitions_by_status: std::collections::HashMap<String, i64>,
71 pub time_by_status_ms: std::collections::HashMap<String, i64>,
72 pub transitions_by_agent: std::collections::HashMap<String, i64>,
73 pub time_by_agent_ms: std::collections::HashMap<String, i64>,
74}
75
76impl Database {
77 pub fn get_task_state_history(&self, task_id: &str) -> Result<Vec<TaskStateEvent>> {
79 self.with_conn(|conn| {
80 let mut stmt = conn.prepare(
81 "SELECT id, task_id, worker_id, event, reason, timestamp, end_timestamp
82 FROM task_state_sequence
83 WHERE task_id = ?1
84 ORDER BY id ASC",
85 )?;
86
87 let events = stmt
88 .query_map(params![task_id], |row| {
89 Ok(TaskStateEvent {
90 id: row.get(0)?,
91 task_id: row.get(1)?,
92 worker_id: row.get(2)?,
93 event: row.get(3)?,
94 reason: row.get(4)?,
95 timestamp: row.get(5)?,
96 end_timestamp: row.get(6)?,
97 })
98 })?
99 .collect::<Result<Vec<_>, _>>()?;
100
101 Ok(events)
102 })
103 }
104
105 pub fn get_current_state_duration(
108 &self,
109 task_id: &str,
110 states_config: &StatesConfig,
111 ) -> Result<Option<i64>> {
112 self.with_conn(|conn| {
113 let result: Option<(String, i64)> = conn
114 .query_row(
115 "SELECT event, timestamp FROM task_state_sequence
116 WHERE task_id = ?1 AND end_timestamp IS NULL
117 ORDER BY id DESC LIMIT 1",
118 params![task_id],
119 |row| Ok((row.get(0)?, row.get(1)?)),
120 )
121 .ok();
122
123 match result {
124 Some((event_str, start_timestamp)) => {
125 if states_config.is_timed_state(&event_str) {
126 return Ok(Some(now_ms() - start_timestamp));
127 }
128 Ok(None)
129 }
130 None => Ok(None),
131 }
132 })
133 }
134
135 pub fn get_project_state_history(
138 &self,
139 from_timestamp: Option<i64>,
140 to_timestamp: Option<i64>,
141 state_filter: Option<&[String]>,
142 limit: Option<i64>,
143 ) -> Result<Vec<TaskStateEvent>> {
144 self.with_conn(|conn| {
145 let mut sql = String::from(
147 "SELECT id, task_id, worker_id, event, reason, timestamp, end_timestamp
148 FROM task_state_sequence WHERE 1=1"
149 );
150 let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
151
152 if let Some(from_ts) = from_timestamp {
153 sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
154 param_values.push(Box::new(from_ts));
155 }
156
157 if let Some(to_ts) = to_timestamp {
158 sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
159 param_values.push(Box::new(to_ts));
160 }
161
162 if let Some(states) = state_filter {
163 if !states.is_empty() {
164 let placeholders: Vec<String> = states.iter().enumerate()
165 .map(|(i, _)| format!("?{}", param_values.len() + i + 1))
166 .collect();
167 sql.push_str(&format!(" AND event IN ({})", placeholders.join(", ")));
168 for state in states {
169 param_values.push(Box::new(state.clone()));
170 }
171 }
172 }
173
174 sql.push_str(" ORDER BY timestamp DESC, id DESC");
175
176 if let Some(lim) = limit {
177 sql.push_str(&format!(" LIMIT ?{}", param_values.len() + 1));
178 param_values.push(Box::new(lim));
179 }
180
181 let mut stmt = conn.prepare(&sql)?;
182
183 let param_refs: Vec<&dyn rusqlite::ToSql> = param_values.iter().map(|b| b.as_ref()).collect();
185
186 let events = stmt
187 .query_map(param_refs.as_slice(), |row| {
188 Ok(TaskStateEvent {
189 id: row.get(0)?,
190 task_id: row.get(1)?,
191 worker_id: row.get(2)?,
192 event: row.get(3)?,
193 reason: row.get(4)?,
194 timestamp: row.get(5)?,
195 end_timestamp: row.get(6)?,
196 })
197 })?
198 .collect::<Result<Vec<_>, _>>()?;
199
200 Ok(events)
201 })
202 }
203
204 pub fn get_project_state_stats(
207 &self,
208 from_timestamp: Option<i64>,
209 to_timestamp: Option<i64>,
210 ) -> Result<ProjectStateStats> {
211 self.with_conn(|conn| {
212 let mut transitions_by_status = std::collections::HashMap::new();
213 let mut time_by_status = std::collections::HashMap::new();
214 let mut transitions_by_agent = std::collections::HashMap::new();
215 let mut time_by_agent = std::collections::HashMap::new();
216 let mut tasks_touched = std::collections::HashSet::new();
217 let mut total_transitions = 0i64;
218 let mut total_time_ms = 0i64;
219
220 let mut sql = String::from(
222 "SELECT event, worker_id, task_id, timestamp, end_timestamp FROM task_state_sequence WHERE 1=1"
223 );
224 let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
225
226 if let Some(from_ts) = from_timestamp {
227 sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
228 param_values.push(Box::new(from_ts));
229 }
230
231 if let Some(to_ts) = to_timestamp {
232 sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
233 param_values.push(Box::new(to_ts));
234 }
235
236 let mut stmt = conn.prepare(&sql)?;
237 let param_refs: Vec<&dyn rusqlite::ToSql> = param_values.iter().map(|b| b.as_ref()).collect();
238
239 let mut rows = stmt.query(param_refs.as_slice())?;
240
241 while let Some(row) = rows.next()? {
242 let event: String = row.get(0)?;
243 let worker_id: Option<String> = row.get(1)?;
244 let task_id: String = row.get(2)?;
245 let timestamp: i64 = row.get(3)?;
246 let end_timestamp: Option<i64> = row.get(4)?;
247
248 total_transitions += 1;
249 tasks_touched.insert(task_id);
250
251 *transitions_by_status.entry(event.clone()).or_insert(0i64) += 1;
252
253 if let Some(ref agent) = worker_id {
254 *transitions_by_agent.entry(agent.clone()).or_insert(0i64) += 1;
255 }
256
257 if let Some(end_ts) = end_timestamp {
259 let duration = end_ts - timestamp;
260 total_time_ms += duration;
261 *time_by_status.entry(event).or_insert(0i64) += duration;
262
263 if let Some(agent) = worker_id {
264 *time_by_agent.entry(agent).or_insert(0i64) += duration;
265 }
266 }
267 }
268
269 Ok(ProjectStateStats {
270 total_transitions,
271 total_time_ms,
272 tasks_affected: tasks_touched.len() as i64,
273 transitions_by_status,
274 time_by_status_ms: time_by_status,
275 transitions_by_agent,
276 time_by_agent_ms: time_by_agent,
277 })
278 })
279 }
280}