Skip to main content

task_graph_mcp/db/
state_transitions.rs

1//! State and phase transition tracking for automatic time accumulation.
2
3use crate::config::StatesConfig;
4use crate::db::{Database, now_ms};
5use crate::types::TaskSequenceEvent;
6use anyhow::Result;
7use rusqlite::{Connection, params};
8
9/// Record a state transition and accumulate time if transitioning from a timed state.
10///
11/// Uses snapshot pattern: only records the new status value. Previous status
12/// can be determined by querying the previous row for the same task.
13///
14/// Returns the elapsed time added to time_actual_ms (0 if previous state was not timed).
15pub(crate) fn record_state_transition(
16    conn: &Connection,
17    task_id: &str,
18    status: &str,
19    worker_id: Option<&str>,
20    reason: Option<&str>,
21    states_config: &StatesConfig,
22) -> Result<i64> {
23    let now = now_ms();
24    let mut elapsed_added = 0i64;
25
26    // Find and close any open transition for this task (status-based)
27    let open_transition: Option<(i64, String, i64)> = conn
28        .query_row(
29            "SELECT id, status, timestamp FROM task_sequence
30             WHERE task_id = ?1 AND end_timestamp IS NULL AND status IS NOT NULL
31             ORDER BY id DESC LIMIT 1",
32            params![task_id],
33            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
34        )
35        .ok();
36
37    if let Some((open_id, prev_status, start_timestamp)) = open_transition {
38        // Close the previous transition
39        conn.execute(
40            "UPDATE task_sequence SET end_timestamp = ?1 WHERE id = ?2",
41            params![now, open_id],
42        )?;
43
44        // If previous state was a timed state, accumulate elapsed time
45        if states_config.is_timed_state(&prev_status) {
46            elapsed_added = now - start_timestamp;
47
48            // Add elapsed time to task's time_actual_ms
49            conn.execute(
50                "UPDATE tasks SET time_actual_ms = COALESCE(time_actual_ms, 0) + ?1, updated_at = ?2
51                 WHERE id = ?3",
52                params![elapsed_added, now, task_id],
53            )?;
54        }
55    }
56
57    // Insert the new transition (snapshot pattern - only new status)
58    conn.execute(
59        "INSERT INTO task_sequence (task_id, worker_id, status, reason, timestamp)
60         VALUES (?1, ?2, ?3, ?4, ?5)",
61        params![task_id, worker_id, status, reason, now],
62    )?;
63
64    Ok(elapsed_added)
65}
66
67/// Record a phase transition for audit purposes.
68///
69/// Uses snapshot pattern: only records the new phase value.
70pub(crate) fn record_phase_transition(
71    conn: &Connection,
72    task_id: &str,
73    phase: &str,
74    worker_id: Option<&str>,
75    reason: Option<&str>,
76) -> Result<()> {
77    let now = now_ms();
78
79    conn.execute(
80        "INSERT INTO task_sequence (task_id, worker_id, phase, reason, timestamp)
81         VALUES (?1, ?2, ?3, ?4, ?5)",
82        params![task_id, worker_id, phase, reason, now],
83    )?;
84
85    Ok(())
86}
87
88/// Statistics for project-wide state transitions.
89#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
90pub struct ProjectStateStats {
91    pub total_transitions: i64,
92    pub total_time_ms: i64,
93    pub tasks_affected: i64,
94    pub transitions_by_status: std::collections::HashMap<String, i64>,
95    pub time_by_status_ms: std::collections::HashMap<String, i64>,
96    pub transitions_by_agent: std::collections::HashMap<String, i64>,
97    pub time_by_agent_ms: std::collections::HashMap<String, i64>,
98}
99
100impl Database {
101    /// Get the unified sequence history for a task (both status and phase changes).
102    pub fn get_task_sequence_history(&self, task_id: &str) -> Result<Vec<TaskSequenceEvent>> {
103        self.with_conn(|conn| {
104            let mut stmt = conn.prepare(
105                "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
106                 FROM task_sequence
107                 WHERE task_id = ?1
108                 ORDER BY id ASC",
109            )?;
110
111            let events = stmt
112                .query_map(params![task_id], |row| {
113                    Ok(TaskSequenceEvent {
114                        id: row.get(0)?,
115                        task_id: row.get(1)?,
116                        worker_id: row.get(2)?,
117                        status: row.get(3)?,
118                        phase: row.get(4)?,
119                        reason: row.get(5)?,
120                        timestamp: row.get(6)?,
121                        end_timestamp: row.get(7)?,
122                    })
123                })?
124                .collect::<Result<Vec<_>, _>>()?;
125
126            Ok(events)
127        })
128    }
129
130    /// Get the state transition history for a task (status changes only, for backward compat).
131    pub fn get_task_state_history(&self, task_id: &str) -> Result<Vec<TaskSequenceEvent>> {
132        self.with_conn(|conn| {
133            let mut stmt = conn.prepare(
134                "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
135                 FROM task_sequence
136                 WHERE task_id = ?1 AND status IS NOT NULL
137                 ORDER BY id ASC",
138            )?;
139
140            let events = stmt
141                .query_map(params![task_id], |row| {
142                    Ok(TaskSequenceEvent {
143                        id: row.get(0)?,
144                        task_id: row.get(1)?,
145                        worker_id: row.get(2)?,
146                        status: row.get(3)?,
147                        phase: row.get(4)?,
148                        reason: row.get(5)?,
149                        timestamp: row.get(6)?,
150                        end_timestamp: row.get(7)?,
151                    })
152                })?
153                .collect::<Result<Vec<_>, _>>()?;
154
155            Ok(events)
156        })
157    }
158
159    /// Get the current duration in the current state (for active time tracking).
160    /// Only returns a duration if the current state is a timed state.
161    pub fn get_current_state_duration(
162        &self,
163        task_id: &str,
164        states_config: &StatesConfig,
165    ) -> Result<Option<i64>> {
166        self.with_conn(|conn| {
167            let result: Option<(String, i64)> = conn
168                .query_row(
169                    "SELECT status, timestamp FROM task_sequence
170                     WHERE task_id = ?1 AND end_timestamp IS NULL AND status IS NOT NULL
171                     ORDER BY id DESC LIMIT 1",
172                    params![task_id],
173                    |row| Ok((row.get(0)?, row.get(1)?)),
174                )
175                .ok();
176
177            match result {
178                Some((status, start_timestamp)) => {
179                    if states_config.is_timed_state(&status) {
180                        return Ok(Some(now_ms() - start_timestamp));
181                    }
182                    Ok(None)
183                }
184                None => Ok(None),
185            }
186        })
187    }
188
189    /// Get project-wide state transition history with optional time range filter.
190    /// Returns all state transitions across all tasks within the specified time range.
191    pub fn get_project_state_history(
192        &self,
193        from_timestamp: Option<i64>,
194        to_timestamp: Option<i64>,
195        state_filter: Option<&[String]>,
196        limit: Option<i64>,
197    ) -> Result<Vec<TaskSequenceEvent>> {
198        self.with_conn(|conn| {
199            // Build query dynamically based on filters
200            let mut sql = String::from(
201                "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
202                 FROM task_sequence WHERE status IS NOT NULL",
203            );
204            let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
205
206            if let Some(from_ts) = from_timestamp {
207                sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
208                param_values.push(Box::new(from_ts));
209            }
210
211            if let Some(to_ts) = to_timestamp {
212                sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
213                param_values.push(Box::new(to_ts));
214            }
215
216            if let Some(states) = state_filter
217                && !states.is_empty()
218            {
219                let placeholders: Vec<String> = states
220                    .iter()
221                    .enumerate()
222                    .map(|(i, _)| format!("?{}", param_values.len() + i + 1))
223                    .collect();
224                sql.push_str(&format!(" AND status IN ({})", placeholders.join(", ")));
225                for state in states {
226                    param_values.push(Box::new(state.clone()));
227                }
228            }
229
230            sql.push_str(" ORDER BY timestamp DESC, id DESC");
231
232            if let Some(lim) = limit {
233                sql.push_str(&format!(" LIMIT ?{}", param_values.len() + 1));
234                param_values.push(Box::new(lim));
235            }
236
237            let mut stmt = conn.prepare(&sql)?;
238
239            // Convert Vec<Box<dyn ToSql>> to slice of references
240            let param_refs: Vec<&dyn rusqlite::ToSql> =
241                param_values.iter().map(|b| b.as_ref()).collect();
242
243            let events = stmt
244                .query_map(param_refs.as_slice(), |row| {
245                    Ok(TaskSequenceEvent {
246                        id: row.get(0)?,
247                        task_id: row.get(1)?,
248                        worker_id: row.get(2)?,
249                        status: row.get(3)?,
250                        phase: row.get(4)?,
251                        reason: row.get(5)?,
252                        timestamp: row.get(6)?,
253                        end_timestamp: row.get(7)?,
254                    })
255                })?
256                .collect::<Result<Vec<_>, _>>()?;
257
258            Ok(events)
259        })
260    }
261
262    /// Get project-wide sequence history (both status and phase changes).
263    pub fn get_project_sequence_history(
264        &self,
265        from_timestamp: Option<i64>,
266        to_timestamp: Option<i64>,
267        limit: Option<i64>,
268    ) -> Result<Vec<TaskSequenceEvent>> {
269        self.with_conn(|conn| {
270            let mut sql = String::from(
271                "SELECT id, task_id, worker_id, status, phase, reason, timestamp, end_timestamp
272                 FROM task_sequence WHERE 1=1",
273            );
274            let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
275
276            if let Some(from_ts) = from_timestamp {
277                sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
278                param_values.push(Box::new(from_ts));
279            }
280
281            if let Some(to_ts) = to_timestamp {
282                sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
283                param_values.push(Box::new(to_ts));
284            }
285
286            sql.push_str(" ORDER BY timestamp DESC, id DESC");
287
288            if let Some(lim) = limit {
289                sql.push_str(&format!(" LIMIT ?{}", param_values.len() + 1));
290                param_values.push(Box::new(lim));
291            }
292
293            let mut stmt = conn.prepare(&sql)?;
294            let param_refs: Vec<&dyn rusqlite::ToSql> =
295                param_values.iter().map(|b| b.as_ref()).collect();
296
297            let events = stmt
298                .query_map(param_refs.as_slice(), |row| {
299                    Ok(TaskSequenceEvent {
300                        id: row.get(0)?,
301                        task_id: row.get(1)?,
302                        worker_id: row.get(2)?,
303                        status: row.get(3)?,
304                        phase: row.get(4)?,
305                        reason: row.get(5)?,
306                        timestamp: row.get(6)?,
307                        end_timestamp: row.get(7)?,
308                    })
309                })?
310                .collect::<Result<Vec<_>, _>>()?;
311
312            Ok(events)
313        })
314    }
315
316    /// Get aggregate project statistics for state transitions within a time range.
317    /// Returns counts of transitions per state and per agent.
318    pub fn get_project_state_stats(
319        &self,
320        from_timestamp: Option<i64>,
321        to_timestamp: Option<i64>,
322    ) -> Result<ProjectStateStats> {
323        self.with_conn(|conn| {
324            let mut transitions_by_status = std::collections::HashMap::new();
325            let mut time_by_status = std::collections::HashMap::new();
326            let mut transitions_by_agent = std::collections::HashMap::new();
327            let mut time_by_agent = std::collections::HashMap::new();
328            let mut tasks_touched = std::collections::HashSet::new();
329            let mut total_transitions = 0i64;
330            let mut total_time_ms = 0i64;
331
332            // Build base query - only count status transitions for stats
333            let mut sql = String::from(
334                "SELECT status, worker_id, task_id, timestamp, end_timestamp
335                 FROM task_sequence WHERE status IS NOT NULL",
336            );
337            let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
338
339            if let Some(from_ts) = from_timestamp {
340                sql.push_str(&format!(" AND timestamp >= ?{}", param_values.len() + 1));
341                param_values.push(Box::new(from_ts));
342            }
343
344            if let Some(to_ts) = to_timestamp {
345                sql.push_str(&format!(" AND timestamp <= ?{}", param_values.len() + 1));
346                param_values.push(Box::new(to_ts));
347            }
348
349            let mut stmt = conn.prepare(&sql)?;
350            let param_refs: Vec<&dyn rusqlite::ToSql> =
351                param_values.iter().map(|b| b.as_ref()).collect();
352
353            let mut rows = stmt.query(param_refs.as_slice())?;
354
355            while let Some(row) = rows.next()? {
356                let status: String = row.get(0)?;
357                let worker_id: Option<String> = row.get(1)?;
358                let task_id: String = row.get(2)?;
359                let timestamp: i64 = row.get(3)?;
360                let end_timestamp: Option<i64> = row.get(4)?;
361
362                total_transitions += 1;
363                tasks_touched.insert(task_id);
364
365                *transitions_by_status.entry(status.clone()).or_insert(0i64) += 1;
366
367                if let Some(ref agent) = worker_id {
368                    *transitions_by_agent.entry(agent.clone()).or_insert(0i64) += 1;
369                }
370
371                // Calculate duration if we have an end timestamp
372                if let Some(end_ts) = end_timestamp {
373                    let duration = end_ts - timestamp;
374                    total_time_ms += duration;
375                    *time_by_status.entry(status).or_insert(0i64) += duration;
376
377                    if let Some(agent) = worker_id {
378                        *time_by_agent.entry(agent).or_insert(0i64) += duration;
379                    }
380                }
381            }
382
383            Ok(ProjectStateStats {
384                total_transitions,
385                total_time_ms,
386                tasks_affected: tasks_touched.len() as i64,
387                transitions_by_status,
388                time_by_status_ms: time_by_status,
389                transitions_by_agent,
390                time_by_agent_ms: time_by_agent,
391            })
392        })
393    }
394}