1use crate::config::StatesConfig;
4use crate::db::{Database, now_ms};
5use crate::types::TaskSequenceEvent;
6use anyhow::Result;
7use rusqlite::{Connection, params};
8
9pub(crate) fn record_state_transition(
16 conn: &Connection,
17 task_id: &str,
18 status: &str,
19 worker_id: Option<&str>,
20 reason: Option<&str>,
21 states_config: &StatesConfig,
22) -> Result<i64> {
23 let now = now_ms();
24 let mut elapsed_added = 0i64;
25
26 let open_transition: Option<(i64, String, i64)> = conn
28 .query_row(
29 "SELECT id, status, timestamp FROM task_sequence
30 WHERE task_id = ?1 AND end_timestamp IS NULL AND status IS NOT NULL
31 ORDER BY id DESC LIMIT 1",
32 params![task_id],
33 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
34 )
35 .ok();
36
37 if let Some((open_id, prev_status, start_timestamp)) = open_transition {
38 conn.execute(
40 "UPDATE task_sequence SET end_timestamp = ?1 WHERE id = ?2",
41 params![now, open_id],
42 )?;
43
44 if states_config.is_timed_state(&prev_status) {
46 elapsed_added = now - start_timestamp;
47
48 conn.execute(
50 "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
51 WHERE id = ?3",
52 params![elapsed_added, now, task_id],
53 )?;
54 }
55 }
56
57 conn.execute(
59 "INSERT INTO task_sequence (task_id, worker_id, status, reason, timestamp)
60 VALUES (?1, ?2, ?3, ?4, ?5)",
61 params![task_id, worker_id, status, reason, now],
62 )?;
63
64 Ok(elapsed_added)
65}
66
67pub(crate) fn record_phase_transition(
71 conn: &Connection,
72 task_id: &str,
73 phase: &str,
74 worker_id: Option<&str>,
75 reason: Option<&str>,
76) -> Result<()> {
77 let now = now_ms();
78
79 conn.execute(
80 "INSERT INTO task_sequence (task_id, worker_id, phase, reason, timestamp)
81 VALUES (?1, ?2, ?3, ?4, ?5)",
82 params![task_id, worker_id, phase, reason, now],
83 )?;
84
85 Ok(())
86}
87
88#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
90pub struct ProjectStateStats {
91 pub total_transitions: i64,
92 pub total_time_ms: i64,
93 pub tasks_affected: i64,
94 pub transitions_by_status: std::collections::HashMap<String, i64>,
95 pub time_by_status_ms: std::collections::HashMap<String, i64>,
96 pub transitions_by_agent: std::collections::HashMap<String, i64>,
97 pub time_by_agent_ms: std::collections::HashMap<String, i64>,
98}
99
100impl Database {
101 pub fn get_task_sequence_history(&self, task_id: &str) -> Result<Vec<TaskSequenceEvent>> {
103 self.with_conn(|conn| {
104 let mut stmt = conn.prepare(
105 "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
106 FROM task_sequence
107 WHERE task_id = ?1
108 ORDER BY id ASC",
109 )?;
110
111 let events = stmt
112 .query_map(params![task_id], |row| {
113 Ok(TaskSequenceEvent {
114 id: row.get(0)?,
115 task_id: row.get(1)?,
116 worker_id: row.get(2)?,
117 status: row.get(3)?,
118 phase: row.get(4)?,
119 reason: row.get(5)?,
120 timestamp: row.get(6)?,
121 end_timestamp: row.get(7)?,
122 })
123 })?
124 .collect::<Result<Vec<_>, _>>()?;
125
126 Ok(events)
127 })
128 }
129
130 pub fn get_task_state_history(&self, task_id: &str) -> Result<Vec<TaskSequenceEvent>> {
132 self.with_conn(|conn| {
133 let mut stmt = conn.prepare(
134 "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
135 FROM task_sequence
136 WHERE task_id = ?1 AND status IS NOT NULL
137 ORDER BY id ASC",
138 )?;
139
140 let events = stmt
141 .query_map(params![task_id], |row| {
142 Ok(TaskSequenceEvent {
143 id: row.get(0)?,
144 task_id: row.get(1)?,
145 worker_id: row.get(2)?,
146 status: row.get(3)?,
147 phase: row.get(4)?,
148 reason: row.get(5)?,
149 timestamp: row.get(6)?,
150 end_timestamp: row.get(7)?,
151 })
152 })?
153 .collect::<Result<Vec<_>, _>>()?;
154
155 Ok(events)
156 })
157 }
158
159 pub fn get_current_state_duration(
162 &self,
163 task_id: &str,
164 states_config: &StatesConfig,
165 ) -> Result<Option<i64>> {
166 self.with_conn(|conn| {
167 let result: Option<(String, i64)> = conn
168 .query_row(
169 "SELECT status, timestamp FROM task_sequence
170 WHERE task_id = ?1 AND end_timestamp IS NULL AND status IS NOT NULL
171 ORDER BY id DESC LIMIT 1",
172 params![task_id],
173 |row| Ok((row.get(0)?, row.get(1)?)),
174 )
175 .ok();
176
177 match result {
178 Some((status, start_timestamp)) => {
179 if states_config.is_timed_state(&status) {
180 return Ok(Some(now_ms() - start_timestamp));
181 }
182 Ok(None)
183 }
184 None => Ok(None),
185 }
186 })
187 }
188
189 pub fn get_project_state_history(
192 &self,
193 from_timestamp: Option<i64>,
194 to_timestamp: Option<i64>,
195 state_filter: Option<&[String]>,
196 limit: Option<i64>,
197 ) -> Result<Vec<TaskSequenceEvent>> {
198 self.with_conn(|conn| {
199 let mut sql = String::from(
201 "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
202 FROM task_sequence WHERE status IS NOT NULL",
203 );
204 let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
205
206 if let Some(from_ts) = from_timestamp {
207 sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
208 param_values.push(Box::new(from_ts));
209 }
210
211 if let Some(to_ts) = to_timestamp {
212 sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
213 param_values.push(Box::new(to_ts));
214 }
215
216 if let Some(states) = state_filter
217 && !states.is_empty()
218 {
219 let placeholders: Vec<String> = states
220 .iter()
221 .enumerate()
222 .map(|(i, _)| format!("?{}", param_values.len() + i + 1))
223 .collect();
224 sql.push_str(&format!(" AND status IN ({})", placeholders.join(", ")));
225 for state in states {
226 param_values.push(Box::new(state.clone()));
227 }
228 }
229
230 sql.push_str(" ORDER BY timestamp DESC, id DESC");
231
232 if let Some(lim) = limit {
233 sql.push_str(&format!(" LIMIT ?{}", param_values.len() + 1));
234 param_values.push(Box::new(lim));
235 }
236
237 let mut stmt = conn.prepare(&sql)?;
238
239 let param_refs: Vec<&dyn rusqlite::ToSql> =
241 param_values.iter().map(|b| b.as_ref()).collect();
242
243 let events = stmt
244 .query_map(param_refs.as_slice(), |row| {
245 Ok(TaskSequenceEvent {
246 id: row.get(0)?,
247 task_id: row.get(1)?,
248 worker_id: row.get(2)?,
249 status: row.get(3)?,
250 phase: row.get(4)?,
251 reason: row.get(5)?,
252 timestamp: row.get(6)?,
253 end_timestamp: row.get(7)?,
254 })
255 })?
256 .collect::<Result<Vec<_>, _>>()?;
257
258 Ok(events)
259 })
260 }
261
262 pub fn get_project_sequence_history(
264 &self,
265 from_timestamp: Option<i64>,
266 to_timestamp: Option<i64>,
267 limit: Option<i64>,
268 ) -> Result<Vec<TaskSequenceEvent>> {
269 self.with_conn(|conn| {
270 let mut sql = String::from(
271 "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
272 FROM task_sequence WHERE 1=1",
273 );
274 let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
275
276 if let Some(from_ts) = from_timestamp {
277 sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
278 param_values.push(Box::new(from_ts));
279 }
280
281 if let Some(to_ts) = to_timestamp {
282 sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
283 param_values.push(Box::new(to_ts));
284 }
285
286 sql.push_str(" ORDER BY timestamp DESC, id DESC");
287
288 if let Some(lim) = limit {
289 sql.push_str(&format!(" LIMIT ?{}", param_values.len() + 1));
290 param_values.push(Box::new(lim));
291 }
292
293 let mut stmt = conn.prepare(&sql)?;
294 let param_refs: Vec<&dyn rusqlite::ToSql> =
295 param_values.iter().map(|b| b.as_ref()).collect();
296
297 let events = stmt
298 .query_map(param_refs.as_slice(), |row| {
299 Ok(TaskSequenceEvent {
300 id: row.get(0)?,
301 task_id: row.get(1)?,
302 worker_id: row.get(2)?,
303 status: row.get(3)?,
304 phase: row.get(4)?,
305 reason: row.get(5)?,
306 timestamp: row.get(6)?,
307 end_timestamp: row.get(7)?,
308 })
309 })?
310 .collect::<Result<Vec<_>, _>>()?;
311
312 Ok(events)
313 })
314 }
315
316 pub fn get_project_state_stats(
319 &self,
320 from_timestamp: Option<i64>,
321 to_timestamp: Option<i64>,
322 ) -> Result<ProjectStateStats> {
323 self.with_conn(|conn| {
324 let mut transitions_by_status = std::collections::HashMap::new();
325 let mut time_by_status = std::collections::HashMap::new();
326 let mut transitions_by_agent = std::collections::HashMap::new();
327 let mut time_by_agent = std::collections::HashMap::new();
328 let mut tasks_touched = std::collections::HashSet::new();
329 let mut total_transitions = 0i64;
330 let mut total_time_ms = 0i64;
331
332 let mut sql = String::from(
334 "SELECT status, worker_id, task_id, timestamp, end_timestamp
335 FROM task_sequence WHERE status IS NOT NULL",
336 );
337 let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
338
339 if let Some(from_ts) = from_timestamp {
340 sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
341 param_values.push(Box::new(from_ts));
342 }
343
344 if let Some(to_ts) = to_timestamp {
345 sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
346 param_values.push(Box::new(to_ts));
347 }
348
349 let mut stmt = conn.prepare(&sql)?;
350 let param_refs: Vec<&dyn rusqlite::ToSql> =
351 param_values.iter().map(|b| b.as_ref()).collect();
352
353 let mut rows = stmt.query(param_refs.as_slice())?;
354
355 while let Some(row) = rows.next()? {
356 let status: String = row.get(0)?;
357 let worker_id: Option<String> = row.get(1)?;
358 let task_id: String = row.get(2)?;
359 let timestamp: i64 = row.get(3)?;
360 let end_timestamp: Option<i64> = row.get(4)?;
361
362 total_transitions += 1;
363 tasks_touched.insert(task_id);
364
365 *transitions_by_status.entry(status.clone()).or_insert(0i64) += 1;
366
367 if let Some(ref agent) = worker_id {
368 *transitions_by_agent.entry(agent.clone()).or_insert(0i64) += 1;
369 }
370
371 if let Some(end_ts) = end_timestamp {
373 let duration = end_ts - timestamp;
374 total_time_ms += duration;
375 *time_by_status.entry(status).or_insert(0i64) += duration;
376
377 if let Some(agent) = worker_id {
378 *time_by_agent.entry(agent).or_insert(0i64) += duration;
379 }
380 }
381 }
382
383 Ok(ProjectStateStats {
384 total_transitions,
385 total_time_ms,
386 tasks_affected: tasks_touched.len() as i64,
387 transitions_by_status,
388 time_by_status_ms: time_by_status,
389 transitions_by_agent,
390 time_by_agent_ms: time_by_agent,
391 })
392 })
393 }
394}