1use super::Database;
6use anyhow::Result;
7use rusqlite::params;
8use std::collections::HashMap;
9
10#[derive(Debug, Clone)]
12pub struct DashboardTask {
13 pub id: String,
14 pub title: Option<String>,
15 pub status: String,
16 pub priority: i32,
17}
18
19#[derive(Debug, Clone)]
21pub struct TaskListItem {
22 pub id: String,
23 pub title: Option<String>,
24 pub status: String,
25 pub priority: i32,
26 pub worker_id: Option<String>,
27 pub tags: String,
28 pub created_at: i64,
29 pub updated_at: i64,
30}
31
32#[derive(Debug, Clone, Default)]
34pub struct TaskListQuery {
35 pub status: Option<String>,
36 pub phase: Option<String>,
37 pub tags: Option<String>,
38 pub parent: Option<String>,
39 pub owner: Option<String>,
40 pub sort_by: String,
41 pub sort_order: String,
42 pub page: i32,
43 pub limit: i32,
44 pub timed_filter: Option<bool>,
46 pub timed_states: Vec<String>,
48}
49
50#[derive(Debug, Clone)]
52pub struct TaskListResult {
53 pub tasks: Vec<TaskListItem>,
54 pub total: i64,
55 pub page: i32,
56 pub limit: i32,
57 pub total_pages: i32,
58}
59
60#[derive(Debug, Clone)]
62pub struct ActivityEvent {
63 pub id: i64,
64 pub event_type: ActivityEventType,
65 pub timestamp: i64,
66 pub worker_id: Option<String>,
67 pub task_id: Option<String>,
68 pub file_path: Option<String>,
69 pub from_status: Option<String>,
70 pub to_status: Option<String>,
71 pub reason: Option<String>,
72}
73
74#[derive(Debug, Clone, PartialEq)]
76pub enum ActivityEventType {
77 TaskTransition,
78 FileClaim,
79 FileRelease,
80}
81
82impl ActivityEventType {
83 pub fn as_str(&self) -> &'static str {
84 match self {
85 ActivityEventType::TaskTransition => "transition",
86 ActivityEventType::FileClaim => "claim",
87 ActivityEventType::FileRelease => "release",
88 }
89 }
90}
91
92#[derive(Debug, Clone, Default)]
94pub struct ActivityListQuery {
95 pub event_type: Option<String>,
96 pub status: Option<String>,
97 pub worker: Option<String>,
98 pub task: Option<String>,
99 pub page: i32,
100 pub limit: i32,
101}
102
103#[derive(Debug, Clone)]
105pub struct ActivityListResult {
106 pub events: Vec<ActivityEvent>,
107 pub total: i64,
108 pub page: i32,
109 pub limit: i32,
110 pub total_pages: i32,
111}
112
113#[derive(Debug, Clone)]
115pub struct ActivityStats {
116 pub total_events_24h: i64,
117 pub transitions_24h: i64,
118 pub file_events_24h: i64,
119 pub active_workers: i64,
120 pub events_by_status: HashMap<String, i64>,
121}
122
123#[derive(Debug, Clone)]
125pub struct DashboardWorker {
126 pub id: String,
127 pub current_thought: Option<String>,
128 pub claim_count: i32,
129}
130
131#[derive(Debug, Clone)]
133pub struct WorkerClaimedTask {
134 pub id: String,
135 pub title: Option<String>,
136 pub status: String,
137 pub current_thought: Option<String>,
138}
139
140impl Database {
141 pub fn get_task_stats(&self) -> Result<(i64, i64, i64)> {
143 self.with_conn(|conn| {
144 let total: i64 = conn.query_row(
145 "SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL",
146 [],
147 |row| row.get(0),
148 )?;
149
150 let working: i64 = conn.query_row(
151 "SELECT COUNT(*) FROM tasks WHERE status = 'working' AND deleted_at IS NULL",
152 [],
153 |row| row.get(0),
154 )?;
155
156 let completed: i64 = conn.query_row(
157 "SELECT COUNT(*) FROM tasks WHERE status = 'completed' AND deleted_at IS NULL",
158 [],
159 |row| row.get(0),
160 )?;
161
162 Ok((total, working, completed))
163 })
164 }
165
166 pub fn get_active_worker_count(&self) -> Result<i64> {
168 self.with_conn(|conn| {
169 let cutoff = super::now_ms() - (5 * 60 * 1000);
171 let count: i64 = conn.query_row(
172 "SELECT COUNT(*) FROM workers WHERE last_heartbeat > ?1",
173 params![cutoff],
174 |row| row.get(0),
175 )?;
176 Ok(count)
177 })
178 }
179
180 pub fn get_recent_tasks(&self, limit: i32) -> Result<Vec<DashboardTask>> {
182 self.with_conn(|conn| {
183 let mut stmt = conn.prepare(
184 "SELECT id, title, status, priority
185 FROM tasks
186 WHERE deleted_at IS NULL
187 ORDER BY updated_at DESC
188 LIMIT ?1",
189 )?;
190
191 let tasks = stmt
192 .query_map(params![limit], |row| {
193 let id: String = row.get(0)?;
194 let title: Option<String> = row.get(1)?;
195 let status: String = row.get(2)?;
196 let priority: i32 = row.get(3)?;
197 Ok(DashboardTask {
198 id,
199 title,
200 status,
201 priority,
202 })
203 })?
204 .filter_map(|r| r.ok())
205 .collect();
206
207 Ok(tasks)
208 })
209 }
210
211 pub fn get_active_workers(&self) -> Result<Vec<DashboardWorker>> {
213 self.with_conn(|conn| {
214 let cutoff = super::now_ms() - (5 * 60 * 1000);
216
217 let mut stmt = conn.prepare(
218 "SELECT w.id,
219 (SELECT current_thought FROM tasks WHERE worker_id = w.id AND status = 'working' AND current_thought IS NOT NULL LIMIT 1),
220 (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND status = 'working')
221 FROM workers w
222 WHERE w.last_heartbeat > ?1
223 ORDER BY w.last_heartbeat DESC"
224 )?;
225
226 let workers = stmt
227 .query_map(params![cutoff], |row| {
228 let id: String = row.get(0)?;
229 let current_thought: Option<String> = row.get(1)?;
230 let claim_count: i32 = row.get(2)?;
231 Ok(DashboardWorker { id, current_thought, claim_count })
232 })?
233 .filter_map(|r| r.ok())
234 .collect();
235
236 Ok(workers)
237 })
238 }
239
240 pub fn query_tasks(&self, query: &TaskListQuery) -> Result<TaskListResult> {
242 self.with_conn(|conn| {
243 let mut sql = String::from(
244 "SELECT t.id, t.title, t.status, t.priority, t.worker_id, t.tags, t.created_at, t.updated_at
245 FROM tasks t
246 WHERE t.deleted_at IS NULL"
247 );
248
249 let mut count_sql = String::from(
250 "SELECT COUNT(*) FROM tasks t WHERE t.deleted_at IS NULL"
251 );
252
253 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
254 let mut param_idx = 1;
255
256 if let Some(ref status) = query.status
258 && !status.is_empty() {
259 let clause = format!(" AND t.status = ?{}", param_idx);
260 sql.push_str(&clause);
261 count_sql.push_str(&clause);
262 params_vec.push(Box::new(status.clone()));
263 param_idx += 1;
264 }
265
266 if let Some(ref owner) = query.owner
268 && !owner.is_empty() {
269 let clause = format!(" AND t.worker_id = ?{}", param_idx);
270 sql.push_str(&clause);
271 count_sql.push_str(&clause);
272 params_vec.push(Box::new(owner.clone()));
273 param_idx += 1;
274 }
275
276 if let Some(ref parent) = query.parent
278 && !parent.is_empty() {
279 let clause = format!(
280 " AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')",
281 param_idx
282 );
283 sql.push_str(&clause);
284 count_sql.push_str(&clause);
285 params_vec.push(Box::new(parent.clone()));
286 param_idx += 1;
287 }
288
289 if let Some(ref tags) = query.tags
291 && !tags.is_empty() {
292 let tag_list: Vec<&str> = tags.split(',').map(|t| t.trim()).filter(|t| !t.is_empty()).collect();
293 if !tag_list.is_empty() {
294 let mut tag_conditions = Vec::new();
295 for tag in tag_list {
296 tag_conditions.push(format!("t.tags LIKE '%' || ?{} || '%'", param_idx));
297 params_vec.push(Box::new(tag.to_string()));
298 param_idx += 1;
299 }
300 let clause = format!(" AND ({})", tag_conditions.join(" OR "));
301 sql.push_str(&clause);
302 count_sql.push_str(&clause);
303 }
304 }
305
306 let order_clause = match (query.sort_by.as_str(), query.sort_order.as_str()) {
308 ("priority", "asc") => " ORDER BY t.priority ASC, t.created_at DESC",
309 ("priority", "desc") | ("priority", _) => " ORDER BY t.priority DESC, t.created_at DESC",
310 ("created", "asc") | ("created_at", "asc") => " ORDER BY t.created_at ASC",
311 ("created", "desc") | ("created_at", "desc") => " ORDER BY t.created_at DESC",
312 ("updated", "asc") | ("updated_at", "asc") => " ORDER BY t.updated_at ASC",
313 ("updated", "desc") | ("updated_at", "desc") => " ORDER BY t.updated_at DESC",
314 _ => " ORDER BY t.priority DESC, t.created_at DESC",
315 };
316 sql.push_str(order_clause);
317
318 let offset = (query.page - 1) * query.limit;
320 sql.push_str(&format!(" LIMIT {} OFFSET {}", query.limit, offset));
321
322 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
324 let total: i64 = conn.query_row(&count_sql, params_refs.as_slice(), |row| row.get(0))?;
325
326 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
328 let mut stmt = conn.prepare(&sql)?;
329
330 let tasks = stmt
331 .query_map(params_refs.as_slice(), |row| {
332 Ok(TaskListItem {
333 id: row.get(0)?,
334 title: row.get(1)?,
335 status: row.get(2)?,
336 priority: row.get(3)?,
337 worker_id: row.get(4)?,
338 tags: row.get::<_, Option<String>>(5)?.unwrap_or_default(),
339 created_at: row.get(6)?,
340 updated_at: row.get(7)?,
341 })
342 })?
343 .filter_map(|r| r.ok())
344 .collect();
345
346 let total_pages = ((total as f64) / (query.limit as f64)).ceil() as i32;
347
348 Ok(TaskListResult {
349 tasks,
350 total,
351 page: query.page,
352 limit: query.limit,
353 total_pages,
354 })
355 })
356 }
357
358 pub fn get_worker_claimed_tasks(&self, worker_id: &str) -> Result<Vec<WorkerClaimedTask>> {
360 self.with_conn(|conn| {
361 let mut stmt = conn.prepare(
362 "SELECT id, title, status, current_thought
363 FROM tasks
364 WHERE worker_id = ?1 AND status = 'working' AND deleted_at IS NULL
365 ORDER BY claimed_at DESC",
366 )?;
367
368 let tasks = stmt
369 .query_map(params![worker_id], |row| {
370 let id: String = row.get(0)?;
371 let title: Option<String> = row.get(1)?;
372 let status: String = row.get(2)?;
373 let current_thought: Option<String> = row.get(3)?;
374 Ok(WorkerClaimedTask {
375 id,
376 title,
377 status,
378 current_thought,
379 })
380 })?
381 .filter_map(|r| r.ok())
382 .collect();
383
384 Ok(tasks)
385 })
386 }
387
388 pub fn dashboard_update_task(
391 &self,
392 task_id: &str,
393 status: Option<&str>,
394 priority: Option<i32>,
395 description: Option<&str>,
396 tags: Option<Vec<String>>,
397 ) -> Result<()> {
398 let now = super::now_ms();
399
400 self.with_conn(|conn| {
401 let mut updates = vec!["updated_at = ?1".to_string()];
403 let mut param_idx = 2;
404
405 if status.is_some() {
406 updates.push(format!("status = ?{}", param_idx));
407 param_idx += 1;
408 }
409 if priority.is_some() {
410 updates.push(format!("priority = ?{}", param_idx));
411 param_idx += 1;
412 }
413 if description.is_some() {
414 updates.push(format!("description = ?{}", param_idx));
415 param_idx += 1;
416 }
417 if tags.is_some() {
418 updates.push(format!("tags = ?{}", param_idx));
419 param_idx += 1;
420 }
421
422 let sql = format!(
423 "UPDATE tasks SET {} WHERE id = ?{}",
424 updates.join(", "),
425 param_idx
426 );
427
428 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
430 params_vec.push(Box::new(now));
431
432 if let Some(s) = status {
433 params_vec.push(Box::new(s.to_string()));
434 }
435 if let Some(p) = priority {
436 params_vec.push(Box::new(p));
437 }
438 if let Some(d) = description {
439 params_vec.push(Box::new(d.to_string()));
440 }
441 if let Some(t) = tags {
442 params_vec.push(Box::new(serde_json::to_string(&t)?));
443 }
444 params_vec.push(Box::new(task_id.to_string()));
445
446 let params_refs: Vec<&dyn rusqlite::ToSql> =
447 params_vec.iter().map(|b| b.as_ref()).collect();
448 let rows_affected = conn.execute(&sql, params_refs.as_slice())?;
449
450 if rows_affected == 0 {
451 return Err(anyhow::anyhow!("Task not found"));
452 }
453
454 Ok(())
455 })
456 }
457
458 pub fn dashboard_delete_task(&self, task_id: &str) -> Result<()> {
460 let now = super::now_ms();
461
462 self.with_conn(|conn| {
463 let child_count: i32 = conn.query_row(
465 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
466 params![task_id],
467 |row| row.get(0),
468 )?;
469
470 if child_count > 0 {
471 return Err(anyhow::anyhow!("Task has children; delete them first"));
472 }
473
474 let rows_affected = conn.execute(
475 "UPDATE tasks SET deleted_at = ?1, updated_at = ?1 WHERE id = ?2 AND deleted_at IS NULL",
476 params![now, task_id],
477 )?;
478
479 if rows_affected == 0 {
480 return Err(anyhow::anyhow!("Task not found or already deleted"));
481 }
482
483 Ok(())
484 })
485 }
486
487 pub fn dashboard_force_release_task(&self, task_id: &str) -> Result<()> {
490 let now = super::now_ms();
491
492 self.with_conn(|conn| {
493 let rows_affected = conn.execute(
494 "UPDATE tasks SET
495 status = 'pending',
496 worker_id = NULL,
497 claimed_at = NULL,
498 current_thought = NULL,
499 updated_at = ?1
500 WHERE id = ?2 AND deleted_at IS NULL",
501 params![now, task_id],
502 )?;
503
504 if rows_affected == 0 {
505 return Err(anyhow::anyhow!("Task not found or already deleted"));
506 }
507
508 Ok(())
509 })
510 }
511
512 pub fn get_activity_stats(&self) -> Result<ActivityStats> {
514 let now = super::now_ms();
515 let cutoff_24h = now - (24 * 60 * 60 * 1000);
516
517 self.with_conn(|conn| {
518 let transitions_24h: i64 = conn.query_row(
520 "SELECT COUNT(*) FROM task_sequence WHERE timestamp >= ?1",
521 params![cutoff_24h],
522 |row| row.get(0),
523 )?;
524
525 let file_events_24h: i64 = conn.query_row(
527 "SELECT COUNT(*) FROM claim_sequence WHERE timestamp >= ?1",
528 params![cutoff_24h],
529 |row| row.get(0),
530 )?;
531
532 let total_events_24h = transitions_24h + file_events_24h;
533
534 let worker_cutoff = now - (5 * 60 * 1000);
536 let active_workers: i64 = conn.query_row(
537 "SELECT COUNT(*) FROM workers WHERE last_heartbeat >= ?1",
538 params![worker_cutoff],
539 |row| row.get(0),
540 )?;
541
542 let mut events_by_status = HashMap::new();
544 let mut stmt = conn.prepare(
545 "SELECT status, COUNT(*) FROM task_sequence
546 WHERE timestamp >= ?1 AND status IS NOT NULL GROUP BY status",
547 )?;
548 let mut rows = stmt.query(params![cutoff_24h])?;
549 while let Some(row) = rows.next()? {
550 let status: String = row.get(0)?;
551 let count: i64 = row.get(1)?;
552 events_by_status.insert(status, count);
553 }
554
555 Ok(ActivityStats {
556 total_events_24h,
557 transitions_24h,
558 file_events_24h,
559 active_workers,
560 events_by_status,
561 })
562 })
563 }
564
565 pub fn query_activity(&self, query: &ActivityListQuery) -> Result<ActivityListResult> {
567 self.with_conn(|conn| {
568 let mut events = Vec::new();
572 let mut total: i64 = 0;
573
574 let include_transitions =
576 query.event_type.is_none() || query.event_type.as_deref() == Some("transition");
577 let include_files =
578 query.event_type.is_none() || query.event_type.as_deref() == Some("file");
579
580 if include_transitions {
582 let mut sql = String::from(
583 "SELECT id, task_id, worker_id, status, reason, timestamp
584 FROM task_sequence WHERE status IS NOT NULL",
585 );
586 let mut count_sql =
587 String::from("SELECT COUNT(*) FROM task_sequence WHERE status IS NOT NULL");
588 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
589 let mut param_idx = 1;
590
591 if let Some(ref status) = query.status
593 && !status.is_empty()
594 {
595 sql.push_str(&format!(" AND status = ?{}", param_idx));
596 count_sql.push_str(&format!(" AND status = ?{}", param_idx));
597 params_vec.push(Box::new(status.clone()));
598 param_idx += 1;
599 }
600
601 if let Some(ref worker) = query.worker
603 && !worker.is_empty()
604 {
605 sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
606 count_sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
607 params_vec.push(Box::new(worker.clone()));
608 param_idx += 1;
609 }
610
611 if let Some(ref task) = query.task
613 && !task.is_empty()
614 {
615 sql.push_str(&format!(" AND task_id LIKE '%' || ?{} || '%'", param_idx));
616 count_sql.push_str(&format!(" AND task_id LIKE '%' || ?{} || '%'", param_idx));
617 params_vec.push(Box::new(task.clone()));
618 let _ = param_idx; }
620
621 sql.push_str(" ORDER BY timestamp DESC");
622
623 let params_refs: Vec<&dyn rusqlite::ToSql> =
625 params_vec.iter().map(|b| b.as_ref()).collect();
626 let trans_count: i64 =
627 conn.query_row(&count_sql, params_refs.as_slice(), |row| row.get(0))?;
628 total += trans_count;
629
630 if !include_files {
632 let offset = (query.page - 1) * query.limit;
633 sql.push_str(&format!(" LIMIT {} OFFSET {}", query.limit, offset));
634 }
635
636 let params_refs: Vec<&dyn rusqlite::ToSql> =
637 params_vec.iter().map(|b| b.as_ref()).collect();
638 let mut stmt = conn.prepare(&sql)?;
639 let mut rows = stmt.query(params_refs.as_slice())?;
640
641 while let Some(row) = rows.next()? {
642 let id: i64 = row.get(0)?;
643 let task_id: String = row.get(1)?;
644 let worker_id: Option<String> = row.get(2)?;
645 let event: String = row.get(3)?;
646 let reason: Option<String> = row.get(4)?;
647 let timestamp: i64 = row.get(5)?;
648
649 events.push(ActivityEvent {
650 id,
651 event_type: ActivityEventType::TaskTransition,
652 timestamp,
653 worker_id,
654 task_id: Some(task_id),
655 file_path: None,
656 from_status: None,
657 to_status: Some(event),
658 reason,
659 });
660 }
661 }
662
663 if include_files && query.status.is_none() {
665 let mut sql = String::from(
666 "SELECT id, file_path, worker_id, event, reason, timestamp
667 FROM claim_sequence WHERE 1=1",
668 );
669 let mut count_sql = String::from("SELECT COUNT(*) FROM claim_sequence WHERE 1=1");
670 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
671 let param_idx = 1;
672
673 if let Some(ref worker) = query.worker
675 && !worker.is_empty()
676 {
677 sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
678 count_sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
679 params_vec.push(Box::new(worker.clone()));
680 }
681
682 sql.push_str(" ORDER BY timestamp DESC");
686
687 let params_refs: Vec<&dyn rusqlite::ToSql> =
689 params_vec.iter().map(|b| b.as_ref()).collect();
690 let file_count: i64 =
691 conn.query_row(&count_sql, params_refs.as_slice(), |row| row.get(0))?;
692 total += file_count;
693
694 if query.task.is_none() || query.task.as_ref().map(|t| t.is_empty()).unwrap_or(true)
696 {
697 let params_refs: Vec<&dyn rusqlite::ToSql> =
698 params_vec.iter().map(|b| b.as_ref()).collect();
699 let mut stmt = conn.prepare(&sql)?;
700 let mut rows = stmt.query(params_refs.as_slice())?;
701
702 while let Some(row) = rows.next()? {
703 let id: i64 = row.get(0)?;
704 let file_path: String = row.get(1)?;
705 let worker_id: String = row.get(2)?;
706 let event: String = row.get(3)?;
707 let reason: Option<String> = row.get(4)?;
708 let timestamp: i64 = row.get(5)?;
709
710 let event_type = if event == "claimed" {
711 ActivityEventType::FileClaim
712 } else {
713 ActivityEventType::FileRelease
714 };
715
716 events.push(ActivityEvent {
717 id: id + 1_000_000_000, event_type,
719 timestamp,
720 worker_id: Some(worker_id),
721 task_id: None,
722 file_path: Some(file_path),
723 from_status: None,
724 to_status: None,
725 reason,
726 });
727 }
728 }
729 }
730
731 events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
733
734 let offset = ((query.page - 1) * query.limit) as usize;
736 let limit = query.limit as usize;
737 let paginated_events: Vec<ActivityEvent> =
738 events.into_iter().skip(offset).take(limit).collect();
739
740 let total_pages = ((total as f64) / (query.limit as f64)).ceil() as i32;
741
742 Ok(ActivityListResult {
743 events: paginated_events,
744 total,
745 page: query.page,
746 limit: query.limit,
747 total_pages: total_pages.max(1),
748 })
749 })
750 }
751
752 pub fn get_all_file_marks(&self) -> Result<Vec<DashboardFileMark>> {
754 self.with_conn(|conn| {
755 let mut stmt = conn.prepare(
756 "SELECT file_path, worker_id, reason, locked_at, task_id
757 FROM file_locks
758 ORDER BY locked_at DESC",
759 )?;
760
761 let marks = stmt
762 .query_map([], |row| {
763 Ok(DashboardFileMark {
764 file_path: row.get(0)?,
765 worker_id: row.get(1)?,
766 reason: row.get(2)?,
767 locked_at: row.get(3)?,
768 task_id: row.get(4)?,
769 })
770 })?
771 .filter_map(|r| r.ok())
772 .collect();
773
774 Ok(marks)
775 })
776 }
777
778 pub fn get_file_marks_stats(&self) -> Result<FileMarksStats> {
780 self.with_conn(|conn| {
781 let total_marks: i64 =
782 conn.query_row("SELECT COUNT(*) FROM file_locks", [], |row| row.get(0))?;
783
784 let unique_agents: i64 = conn.query_row(
785 "SELECT COUNT(DISTINCT worker_id) FROM file_locks",
786 [],
787 |row| row.get(0),
788 )?;
789
790 let with_tasks: i64 = conn.query_row(
791 "SELECT COUNT(*) FROM file_locks WHERE task_id IS NOT NULL",
792 [],
793 |row| row.get(0),
794 )?;
795
796 let now = super::now_ms();
798 let stale_cutoff = now - (60 * 60 * 1000); let stale_marks: i64 = conn.query_row(
800 "SELECT COUNT(*) FROM file_locks WHERE locked_at < ?1",
801 params![stale_cutoff],
802 |row| row.get(0),
803 )?;
804
805 Ok(FileMarksStats {
806 total_marks,
807 unique_agents,
808 with_tasks,
809 stale_marks,
810 })
811 })
812 }
813
814 pub fn force_unmark_file(&self, file_path: &str) -> Result<bool> {
817 let now = super::now_ms();
818
819 self.with_conn_mut(|conn| {
820 let tx = conn.transaction()?;
821
822 let owner: Option<String> = tx.query_row(
824 "SELECT worker_id FROM file_locks WHERE file_path = ?1",
825 params![file_path],
826 |row| row.get(0),
827 ).ok();
828
829 let deleted = tx.execute(
830 "DELETE FROM file_locks WHERE file_path = ?1",
831 params![file_path],
832 )?;
833
834 if deleted > 0
835 && let Some(worker_id) = owner {
836 let claim_id: Option<i64> = tx.query_row(
838 "SELECT MAX(id) FROM claim_sequence
839 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
840 params![file_path, &worker_id],
841 |row| row.get(0),
842 ).ok().flatten();
843
844 tx.execute(
846 "UPDATE claim_sequence SET end_timestamp = ?1
847 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
848 params![now, file_path, &worker_id],
849 )?;
850
851 tx.execute(
853 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
854 VALUES (?1, ?2, 'released', 'Force-unmarked via dashboard', ?3, ?4)",
855 params![file_path, &worker_id, now, claim_id],
856 )?;
857 }
858
859 tx.commit()?;
860 Ok(deleted > 0)
861 })
862 }
863
864 pub fn get_metrics_overview(&self) -> Result<MetricsOverview> {
868 self.with_conn(|conn| {
869 let row: (i64, i64, f64, i64, i64, i64) = conn.query_row(
870 "SELECT
871 COUNT(*) as total_tasks,
872 SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
873 COALESCE(SUM(cost_usd), 0.0) as total_cost,
874 COALESCE(SUM(time_actual_ms), 0) as total_time,
875 COALESCE(SUM(points), 0) as total_points,
876 COALESCE(SUM(CASE WHEN status = 'completed' THEN points ELSE 0 END), 0) as completed_points
877 FROM tasks
878 WHERE deleted_at IS NULL",
879 [],
880 |row| Ok((
881 row.get(0)?,
882 row.get(1)?,
883 row.get(2)?,
884 row.get(3)?,
885 row.get(4)?,
886 row.get(5)?,
887 )),
888 )?;
889
890 Ok(MetricsOverview {
891 total_tasks: row.0,
892 completed_tasks: row.1,
893 total_cost_usd: row.2,
894 total_time_ms: row.3,
895 total_points: row.4,
896 completed_points: row.5,
897 })
898 })
899 }
900
901 pub fn get_status_distribution(&self) -> Result<HashMap<String, i64>> {
903 self.with_conn(|conn| {
904 let mut stmt = conn.prepare(
905 "SELECT status, COUNT(*) as count
906 FROM tasks
907 WHERE deleted_at IS NULL
908 GROUP BY status",
909 )?;
910
911 let mut distribution = HashMap::new();
912 let rows = stmt.query_map([], |row| {
913 let status: String = row.get(0)?;
914 let count: i64 = row.get(1)?;
915 Ok((status, count))
916 })?;
917
918 for row in rows {
919 let (status, count) = row?;
920 distribution.insert(status, count);
921 }
922
923 Ok(distribution)
924 })
925 }
926
927 pub fn get_velocity(&self, period: &str, num_periods: i32) -> Result<Vec<VelocityDataPoint>> {
930 self.with_conn(|conn| {
931 let now = super::now_ms();
932 let period_ms: i64 = match period {
933 "week" => 7 * 24 * 60 * 60 * 1000,
934 _ => 24 * 60 * 60 * 1000, };
936
937 let mut data_points = Vec::new();
938
939 for i in 0..num_periods {
940 let period_end = now - (i as i64 * period_ms);
941 let period_start = period_end - period_ms;
942
943 let (count, points): (i64, i64) = conn.query_row(
944 "SELECT COUNT(*), COALESCE(SUM(points), 0)
945 FROM tasks
946 WHERE deleted_at IS NULL
947 AND status = 'completed'
948 AND completed_at >= ?1
949 AND completed_at < ?2",
950 params![period_start, period_end],
951 |row| Ok((row.get(0)?, row.get(1)?)),
952 )?;
953
954 let label = if period == "week" {
956 if i == 0 {
957 "This week".to_string()
958 } else if i == 1 {
959 "Last week".to_string()
960 } else {
961 format!("{} weeks ago", i)
962 }
963 } else if i == 0 {
964 "Today".to_string()
965 } else if i == 1 {
966 "Yesterday".to_string()
967 } else {
968 format!("{} days ago", i)
969 };
970
971 data_points.push(VelocityDataPoint {
972 period_label: label,
973 completed_count: count,
974 total_points: points,
975 });
976 }
977
978 data_points.reverse();
980 Ok(data_points)
981 })
982 }
983
984 pub fn get_time_in_status(&self) -> Result<Vec<TimeInStatusStats>> {
986 self.with_conn(|conn| {
987 let mut stmt = conn.prepare(
988 "SELECT
989 status,
990 AVG(COALESCE(end_timestamp, ?1) - timestamp) as avg_duration,
991 SUM(COALESCE(end_timestamp, ?1) - timestamp) as total_duration,
992 COUNT(*) as transition_count
993 FROM task_sequence
994 WHERE status IS NOT NULL
995 GROUP BY status
996 ORDER BY avg_duration DESC",
997 )?;
998
999 let now = super::now_ms();
1000 let stats = stmt
1001 .query_map(params![now], |row| {
1002 Ok(TimeInStatusStats {
1003 status: row.get(0)?,
1004 avg_duration_ms: row.get::<_, f64>(1)? as i64,
1005 total_duration_ms: row.get::<_, f64>(2)? as i64,
1006 transition_count: row.get(3)?,
1007 })
1008 })?
1009 .filter_map(|r| r.ok())
1010 .collect();
1011
1012 Ok(stats)
1013 })
1014 }
1015
1016 pub fn get_cost_by_agent(&self) -> Result<Vec<AgentCostStats>> {
1018 self.with_conn(|conn| {
1019 let mut stmt = conn.prepare(
1020 "SELECT
1021 worker_id,
1022 COALESCE(SUM(cost_usd), 0.0) as total_cost,
1023 COUNT(*) as task_count,
1024 SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_count,
1025 COALESCE(SUM(time_actual_ms), 0) as total_time
1026 FROM tasks
1027 WHERE deleted_at IS NULL AND worker_id IS NOT NULL
1028 GROUP BY worker_id
1029 ORDER BY total_cost DESC",
1030 )?;
1031
1032 let stats = stmt
1033 .query_map([], |row| {
1034 Ok(AgentCostStats {
1035 worker_id: row.get(0)?,
1036 total_cost_usd: row.get(1)?,
1037 task_count: row.get(2)?,
1038 completed_count: row.get(3)?,
1039 total_time_ms: row.get(4)?,
1040 })
1041 })?
1042 .filter_map(|r| r.ok())
1043 .collect();
1044
1045 Ok(stats)
1046 })
1047 }
1048
1049 pub fn get_custom_metrics(&self) -> Result<CustomMetricsAggregate> {
1051 self.with_conn(|conn| {
1052 let row: (i64, i64, i64, i64, i64, i64, i64, i64) = conn.query_row(
1053 "SELECT
1054 COALESCE(SUM(metric_0), 0),
1055 COALESCE(SUM(metric_1), 0),
1056 COALESCE(SUM(metric_2), 0),
1057 COALESCE(SUM(metric_3), 0),
1058 COALESCE(SUM(metric_4), 0),
1059 COALESCE(SUM(metric_5), 0),
1060 COALESCE(SUM(metric_6), 0),
1061 COALESCE(SUM(metric_7), 0)
1062 FROM tasks
1063 WHERE deleted_at IS NULL",
1064 [],
1065 |row| {
1066 Ok((
1067 row.get(0)?,
1068 row.get(1)?,
1069 row.get(2)?,
1070 row.get(3)?,
1071 row.get(4)?,
1072 row.get(5)?,
1073 row.get(6)?,
1074 row.get(7)?,
1075 ))
1076 },
1077 )?;
1078
1079 Ok(CustomMetricsAggregate {
1080 metrics: [row.0, row.1, row.2, row.3, row.4, row.5, row.6, row.7],
1081 })
1082 })
1083 }
1084
1085 pub fn get_dependency_graph(
1090 &self,
1091 dep_type: Option<&str>,
1092 focus_task: Option<&str>,
1093 depth: i32,
1094 ) -> Result<DependencyGraph> {
1095 self.with_conn(|conn| {
1096 let mut nodes: Vec<GraphNode> = Vec::new();
1097 let mut edges: Vec<GraphEdge> = Vec::new();
1098 let mut seen_tasks: std::collections::HashSet<String> = std::collections::HashSet::new();
1099
1100 let type_clause = match dep_type {
1102 Some("blocks") => "AND d.dep_type = 'blocks'",
1103 Some("follows") => "AND d.dep_type = 'follows'",
1104 Some("contains") => "AND d.dep_type = 'contains'",
1105 _ => "", };
1107
1108 if let Some(focus_id) = focus_task {
1109 let actual_depth = if depth < 0 { 100 } else { depth };
1111
1112 if let Ok(task) = conn.query_row(
1114 "SELECT id, title, status, priority FROM tasks WHERE id = ?1 AND deleted_at IS NULL",
1115 params![focus_id],
1116 |row| Ok(GraphNode {
1117 id: row.get(0)?,
1118 title: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
1119 status: row.get(2)?,
1120 priority: row.get(3)?,
1121 }),
1122 ) {
1123 seen_tasks.insert(task.id.clone());
1124 nodes.push(task);
1125 }
1126
1127 let mut current_level: Vec<String> = vec![focus_id.to_string()];
1129 for _ in 0..actual_depth {
1130 if current_level.is_empty() { break; }
1131 let mut next_level: Vec<String> = Vec::new();
1132
1133 for tid in ¤t_level {
1134 let sql = format!(
1135 "SELECT d.from_task_id, d.dep_type, t.id, t.title, t.status, t.priority
1136 FROM dependencies d
1137 JOIN tasks t ON d.from_task_id = t.id
1138 WHERE d.to_task_id = ?1 AND t.deleted_at IS NULL {}",
1139 type_clause
1140 );
1141
1142 let mut stmt = conn.prepare(&sql)?;
1143 let rows = stmt.query_map(params![tid], |row| {
1144 Ok((
1145 row.get::<_, String>(0)?,
1146 row.get::<_, String>(1)?,
1147 row.get::<_, String>(2)?,
1148 row.get::<_, Option<String>>(3)?,
1149 row.get::<_, String>(4)?,
1150 row.get::<_, i32>(5)?,
1151 ))
1152 })?;
1153
1154 for row in rows.flatten() {
1155 let (from_id, dep_type_str, task_id, title, status, priority) = row;
1156
1157 edges.push(GraphEdge {
1158 from_id: from_id.clone(),
1159 to_id: tid.clone(),
1160 dep_type: dep_type_str,
1161 });
1162
1163 if !seen_tasks.contains(&task_id) {
1164 seen_tasks.insert(task_id.clone());
1165 nodes.push(GraphNode {
1166 id: task_id.clone(),
1167 title: title.unwrap_or_default(),
1168 status,
1169 priority,
1170 });
1171 next_level.push(task_id);
1172 }
1173 }
1174 }
1175 current_level = next_level;
1176 }
1177
1178 let mut current_level: Vec<String> = vec![focus_id.to_string()];
1180 for _ in 0..actual_depth {
1181 if current_level.is_empty() { break; }
1182 let mut next_level: Vec<String> = Vec::new();
1183
1184 for tid in ¤t_level {
1185 let sql = format!(
1186 "SELECT d.to_task_id, d.dep_type, t.id, t.title, t.status, t.priority
1187 FROM dependencies d
1188 JOIN tasks t ON d.to_task_id = t.id
1189 WHERE d.from_task_id = ?1 AND t.deleted_at IS NULL {}",
1190 type_clause
1191 );
1192
1193 let mut stmt = conn.prepare(&sql)?;
1194 let rows = stmt.query_map(params![tid], |row| {
1195 Ok((
1196 row.get::<_, String>(0)?,
1197 row.get::<_, String>(1)?,
1198 row.get::<_, String>(2)?,
1199 row.get::<_, Option<String>>(3)?,
1200 row.get::<_, String>(4)?,
1201 row.get::<_, i32>(5)?,
1202 ))
1203 })?;
1204
1205 for row in rows.flatten() {
1206 let (to_id, dep_type_str, task_id, title, status, priority) = row;
1207
1208 edges.push(GraphEdge {
1209 from_id: tid.clone(),
1210 to_id: to_id.clone(),
1211 dep_type: dep_type_str,
1212 });
1213
1214 if !seen_tasks.contains(&task_id) {
1215 seen_tasks.insert(task_id.clone());
1216 nodes.push(GraphNode {
1217 id: task_id.clone(),
1218 title: title.unwrap_or_default(),
1219 status,
1220 priority,
1221 });
1222 next_level.push(task_id);
1223 }
1224 }
1225 }
1226 current_level = next_level;
1227 }
1228 } else {
1229 let sql = format!(
1231 "SELECT d.from_task_id, d.to_task_id, d.dep_type
1232 FROM dependencies d
1233 JOIN tasks t1 ON d.from_task_id = t1.id
1234 JOIN tasks t2 ON d.to_task_id = t2.id
1235 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL {}",
1236 type_clause
1237 );
1238
1239 let mut stmt = conn.prepare(&sql)?;
1240 let edge_rows = stmt.query_map([], |row| {
1241 Ok(GraphEdge {
1242 from_id: row.get(0)?,
1243 to_id: row.get(1)?,
1244 dep_type: row.get(2)?,
1245 })
1246 })?;
1247
1248 for edge in edge_rows.flatten() {
1249 seen_tasks.insert(edge.from_id.clone());
1250 seen_tasks.insert(edge.to_id.clone());
1251 edges.push(edge);
1252 }
1253
1254 if !seen_tasks.is_empty() {
1256 let placeholders: String = seen_tasks.iter()
1257 .enumerate()
1258 .map(|(i, _)| format!("?{}", i + 1))
1259 .collect::<Vec<_>>()
1260 .join(", ");
1261
1262 let node_sql = format!(
1263 "SELECT id, title, status, priority FROM tasks
1264 WHERE id IN ({}) AND deleted_at IS NULL",
1265 placeholders
1266 );
1267
1268 let mut stmt = conn.prepare(&node_sql)?;
1269 let params_vec: Vec<String> = seen_tasks.iter().cloned().collect();
1270 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec
1271 .iter()
1272 .map(|s| s as &dyn rusqlite::ToSql)
1273 .collect();
1274
1275 let node_rows = stmt.query_map(params_refs.as_slice(), |row| {
1276 Ok(GraphNode {
1277 id: row.get(0)?,
1278 title: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
1279 status: row.get(2)?,
1280 priority: row.get(3)?,
1281 })
1282 })?;
1283
1284 for node in node_rows.flatten() {
1285 nodes.push(node);
1286 }
1287 }
1288 }
1289
1290 Ok(DependencyGraph { nodes, edges })
1291 })
1292 }
1293
1294 pub fn get_dependency_graph_stats(&self) -> Result<DependencyGraphStats> {
1296 self.with_conn(|conn| {
1297 let total_tasks: i64 = conn.query_row(
1298 "SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL",
1299 [],
1300 |row| row.get(0),
1301 )?;
1302
1303 let total_deps: i64 = conn.query_row(
1304 "SELECT COUNT(*) FROM dependencies d
1305 JOIN tasks t1 ON d.from_task_id = t1.id
1306 JOIN tasks t2 ON d.to_task_id = t2.id
1307 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL",
1308 [],
1309 |row| row.get(0),
1310 )?;
1311
1312 let blocks_count: i64 = conn.query_row(
1313 "SELECT COUNT(*) FROM dependencies d
1314 JOIN tasks t1 ON d.from_task_id = t1.id
1315 JOIN tasks t2 ON d.to_task_id = t2.id
1316 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND d.dep_type = 'blocks'",
1317 [],
1318 |row| row.get(0),
1319 )?;
1320
1321 let follows_count: i64 = conn.query_row(
1322 "SELECT COUNT(*) FROM dependencies d
1323 JOIN tasks t1 ON d.from_task_id = t1.id
1324 JOIN tasks t2 ON d.to_task_id = t2.id
1325 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND d.dep_type = 'follows'",
1326 [],
1327 |row| row.get(0),
1328 )?;
1329
1330 let contains_count: i64 = conn.query_row(
1331 "SELECT COUNT(*) FROM dependencies d
1332 JOIN tasks t1 ON d.from_task_id = t1.id
1333 JOIN tasks t2 ON d.to_task_id = t2.id
1334 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND d.dep_type = 'contains'",
1335 [],
1336 |row| row.get(0),
1337 )?;
1338
1339 Ok(DependencyGraphStats {
1340 total_tasks,
1341 total_deps,
1342 blocks_count,
1343 follows_count,
1344 contains_count,
1345 })
1346 })
1347 }
1348
1349 pub fn get_available_phases(&self) -> Result<Vec<String>> {
1351 self.with_conn(|conn| {
1352 let mut stmt = conn.prepare(
1353 "SELECT DISTINCT phase FROM tasks WHERE phase IS NOT NULL AND deleted_at IS NULL ORDER BY phase"
1354 )?;
1355
1356 let phases = stmt
1357 .query_map([], |row| row.get(0))?
1358 .filter_map(|r| r.ok())
1359 .collect();
1360
1361 Ok(phases)
1362 })
1363 }
1364}
1365
1366#[derive(Debug, Clone)]
1368pub struct DashboardFileMark {
1369 pub file_path: String,
1370 pub worker_id: String,
1371 pub reason: Option<String>,
1372 pub locked_at: i64,
1373 pub task_id: Option<String>,
1374}
1375
1376#[derive(Debug, Clone)]
1378pub struct FileMarksStats {
1379 pub total_marks: i64,
1380 pub unique_agents: i64,
1381 pub with_tasks: i64,
1382 pub stale_marks: i64,
1383}
1384
1385#[derive(Debug, Clone)]
1389pub struct MetricsOverview {
1390 pub total_tasks: i64,
1391 pub completed_tasks: i64,
1392 pub total_cost_usd: f64,
1393 pub total_time_ms: i64,
1394 pub total_points: i64,
1395 pub completed_points: i64,
1396}
1397
1398#[derive(Debug, Clone)]
1400pub struct VelocityDataPoint {
1401 pub period_label: String,
1402 pub completed_count: i64,
1403 pub total_points: i64,
1404}
1405
1406#[derive(Debug, Clone)]
1408pub struct TimeInStatusStats {
1409 pub status: String,
1410 pub avg_duration_ms: i64,
1411 pub total_duration_ms: i64,
1412 pub transition_count: i64,
1413}
1414
1415#[derive(Debug, Clone)]
1417pub struct AgentCostStats {
1418 pub worker_id: String,
1419 pub total_cost_usd: f64,
1420 pub task_count: i64,
1421 pub completed_count: i64,
1422 pub total_time_ms: i64,
1423}
1424
1425#[derive(Debug, Clone)]
1427pub struct CustomMetricsAggregate {
1428 pub metrics: [i64; 8],
1429}
1430
1431#[derive(Debug, Clone, serde::Serialize)]
1435pub struct GraphNode {
1436 pub id: String,
1437 pub title: String,
1438 pub status: String,
1439 pub priority: i32,
1440}
1441
1442#[derive(Debug, Clone, serde::Serialize)]
1444pub struct GraphEdge {
1445 pub from_id: String,
1446 pub to_id: String,
1447 pub dep_type: String,
1448}
1449
1450#[derive(Debug, Clone, serde::Serialize)]
1452pub struct DependencyGraph {
1453 pub nodes: Vec<GraphNode>,
1454 pub edges: Vec<GraphEdge>,
1455}
1456
1457#[derive(Debug, Clone, serde::Serialize)]
1459pub struct DependencyGraphStats {
1460 pub total_tasks: i64,
1461 pub total_deps: i64,
1462 pub blocks_count: i64,
1463 pub follows_count: i64,
1464 pub contains_count: i64,
1465}