1use super::Database;
6use anyhow::Result;
7use rusqlite::params;
8use std::collections::HashMap;
9
10#[derive(Debug, Clone)]
12pub struct DashboardTask {
13 pub id: String,
14 pub title: Option<String>,
15 pub status: String,
16 pub priority: i32,
17}
18
19#[derive(Debug, Clone)]
21pub struct TaskListItem {
22 pub id: String,
23 pub title: Option<String>,
24 pub status: String,
25 pub priority: i32,
26 pub worker_id: Option<String>,
27 pub tags: String,
28 pub created_at: i64,
29 pub updated_at: i64,
30}
31
32#[derive(Debug, Clone, Default)]
34pub struct TaskListQuery {
35 pub status: Option<String>,
36 pub phase: Option<String>,
37 pub tags: Option<String>,
38 pub parent: Option<String>,
39 pub owner: Option<String>,
40 pub sort_by: String,
41 pub sort_order: String,
42 pub page: i32,
43 pub limit: i32,
44 pub timed_filter: Option<bool>,
46 pub timed_states: Vec<String>,
48}
49
50#[derive(Debug, Clone)]
52pub struct TaskListResult {
53 pub tasks: Vec<TaskListItem>,
54 pub total: i64,
55 pub page: i32,
56 pub limit: i32,
57 pub total_pages: i32,
58}
59
60#[derive(Debug, Clone)]
62pub struct ActivityEvent {
63 pub id: i64,
64 pub event_type: ActivityEventType,
65 pub timestamp: i64,
66 pub worker_id: Option<String>,
67 pub task_id: Option<String>,
68 pub file_path: Option<String>,
69 pub from_status: Option<String>,
70 pub to_status: Option<String>,
71 pub reason: Option<String>,
72}
73
74#[derive(Debug, Clone, PartialEq)]
76pub enum ActivityEventType {
77 TaskTransition,
78 FileClaim,
79 FileRelease,
80}
81
82impl ActivityEventType {
83 pub fn as_str(&self) -> &'static str {
84 match self {
85 ActivityEventType::TaskTransition => "transition",
86 ActivityEventType::FileClaim => "claim",
87 ActivityEventType::FileRelease => "release",
88 }
89 }
90}
91
92#[derive(Debug, Clone, Default)]
94pub struct ActivityListQuery {
95 pub event_type: Option<String>,
96 pub status: Option<String>,
97 pub worker: Option<String>,
98 pub task: Option<String>,
99 pub page: i32,
100 pub limit: i32,
101}
102
103#[derive(Debug, Clone)]
105pub struct ActivityListResult {
106 pub events: Vec<ActivityEvent>,
107 pub total: i64,
108 pub page: i32,
109 pub limit: i32,
110 pub total_pages: i32,
111}
112
113#[derive(Debug, Clone)]
115pub struct ActivityStats {
116 pub total_events_24h: i64,
117 pub transitions_24h: i64,
118 pub file_events_24h: i64,
119 pub active_workers: i64,
120 pub events_by_status: HashMap<String, i64>,
121}
122
123#[derive(Debug, Clone)]
125pub struct DashboardWorker {
126 pub id: String,
127 pub current_thought: Option<String>,
128 pub claim_count: i32,
129}
130
131#[derive(Debug, Clone)]
133pub struct WorkerClaimedTask {
134 pub id: String,
135 pub title: Option<String>,
136 pub status: String,
137 pub current_thought: Option<String>,
138}
139
140impl Database {
141 pub fn get_task_stats(&self) -> Result<(i64, i64, i64)> {
143 self.with_conn(|conn| {
144 let total: i64 = conn.query_row(
145 "SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL",
146 [],
147 |row| row.get(0),
148 )?;
149
150 let working: i64 = conn.query_row(
151 "SELECT COUNT(*) FROM tasks WHERE status = 'working' AND deleted_at IS NULL",
152 [],
153 |row| row.get(0),
154 )?;
155
156 let completed: i64 = conn.query_row(
157 "SELECT COUNT(*) FROM tasks WHERE status = 'completed' AND deleted_at IS NULL",
158 [],
159 |row| row.get(0),
160 )?;
161
162 Ok((total, working, completed))
163 })
164 }
165
166 pub fn get_active_worker_count(&self) -> Result<i64> {
168 self.with_conn(|conn| {
169 let cutoff = super::now_ms() - (5 * 60 * 1000);
171 let count: i64 = conn.query_row(
172 "SELECT COUNT(*) FROM workers WHERE last_heartbeat > ?1",
173 params![cutoff],
174 |row| row.get(0),
175 )?;
176 Ok(count)
177 })
178 }
179
180 pub fn get_recent_tasks(&self, limit: i32) -> Result<Vec<DashboardTask>> {
182 self.with_conn(|conn| {
183 let mut stmt = conn.prepare(
184 "SELECT id, title, status, priority
185 FROM tasks
186 WHERE deleted_at IS NULL
187 ORDER BY updated_at DESC
188 LIMIT ?1",
189 )?;
190
191 let tasks = stmt
192 .query_map(params![limit], |row| {
193 let id: String = row.get(0)?;
194 let title: Option<String> = row.get(1)?;
195 let status: String = row.get(2)?;
196 let priority: i32 = row.get(3)?;
197 Ok(DashboardTask {
198 id,
199 title,
200 status,
201 priority,
202 })
203 })?
204 .filter_map(|r| r.ok())
205 .collect();
206
207 Ok(tasks)
208 })
209 }
210
211 pub fn get_active_workers(&self) -> Result<Vec<DashboardWorker>> {
213 self.with_conn(|conn| {
214 let cutoff = super::now_ms() - (5 * 60 * 1000);
216
217 let mut stmt = conn.prepare(
218 "SELECT w.id,
219 (SELECT current_thought FROM tasks WHERE worker_id = w.id AND status = 'working' AND current_thought IS NOT NULL LIMIT 1),
220 (SELECT COUNT(*) FROM tasks WHERE worker_id = w.id AND status = 'working')
221 FROM workers w
222 WHERE w.last_heartbeat > ?1
223 ORDER BY w.last_heartbeat DESC"
224 )?;
225
226 let workers = stmt
227 .query_map(params![cutoff], |row| {
228 let id: String = row.get(0)?;
229 let current_thought: Option<String> = row.get(1)?;
230 let claim_count: i32 = row.get(2)?;
231 Ok(DashboardWorker { id, current_thought, claim_count })
232 })?
233 .filter_map(|r| r.ok())
234 .collect();
235
236 Ok(workers)
237 })
238 }
239
240 pub fn query_tasks(&self, query: &TaskListQuery) -> Result<TaskListResult> {
242 self.with_conn(|conn| {
243 let mut sql = String::from(
244 "SELECT t.id, t.title, t.status, t.priority, t.worker_id, t.tags, t.created_at, t.updated_at
245 FROM tasks t
246 WHERE t.deleted_at IS NULL"
247 );
248
249 let mut count_sql = String::from(
250 "SELECT COUNT(*) FROM tasks t WHERE t.deleted_at IS NULL"
251 );
252
253 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
254 let mut param_idx = 1;
255
256 if let Some(ref status) = query.status {
258 if !status.is_empty() {
259 let clause = format!(" AND t.status = ?{}", param_idx);
260 sql.push_str(&clause);
261 count_sql.push_str(&clause);
262 params_vec.push(Box::new(status.clone()));
263 param_idx += 1;
264 }
265 }
266
267 if let Some(ref owner) = query.owner {
269 if !owner.is_empty() {
270 let clause = format!(" AND t.worker_id = ?{}", param_idx);
271 sql.push_str(&clause);
272 count_sql.push_str(&clause);
273 params_vec.push(Box::new(owner.clone()));
274 param_idx += 1;
275 }
276 }
277
278 if let Some(ref parent) = query.parent {
280 if !parent.is_empty() {
281 let clause = format!(
282 " AND t.id IN (SELECT to_task_id FROM dependencies WHERE from_task_id = ?{} AND dep_type = 'contains')",
283 param_idx
284 );
285 sql.push_str(&clause);
286 count_sql.push_str(&clause);
287 params_vec.push(Box::new(parent.clone()));
288 param_idx += 1;
289 }
290 }
291
292 if let Some(ref tags) = query.tags {
294 if !tags.is_empty() {
295 let tag_list: Vec<&str> = tags.split(',').map(|t| t.trim()).filter(|t| !t.is_empty()).collect();
296 if !tag_list.is_empty() {
297 let mut tag_conditions = Vec::new();
298 for tag in tag_list {
299 tag_conditions.push(format!("t.tags LIKE '%' || ?{} || '%'", param_idx));
300 params_vec.push(Box::new(tag.to_string()));
301 param_idx += 1;
302 }
303 let clause = format!(" AND ({})", tag_conditions.join(" OR "));
304 sql.push_str(&clause);
305 count_sql.push_str(&clause);
306 }
307 }
308 }
309
310 let order_clause = match (query.sort_by.as_str(), query.sort_order.as_str()) {
312 ("priority", "asc") => " ORDER BY t.priority ASC, t.created_at DESC",
313 ("priority", "desc") | ("priority", _) => " ORDER BY t.priority DESC, t.created_at DESC",
314 ("created", "asc") | ("created_at", "asc") => " ORDER BY t.created_at ASC",
315 ("created", "desc") | ("created_at", "desc") => " ORDER BY t.created_at DESC",
316 ("updated", "asc") | ("updated_at", "asc") => " ORDER BY t.updated_at ASC",
317 ("updated", "desc") | ("updated_at", "desc") => " ORDER BY t.updated_at DESC",
318 _ => " ORDER BY t.priority DESC, t.created_at DESC",
319 };
320 sql.push_str(order_clause);
321
322 let offset = (query.page - 1) * query.limit;
324 sql.push_str(&format!(" LIMIT {} OFFSET {}", query.limit, offset));
325
326 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
328 let total: i64 = conn.query_row(&count_sql, params_refs.as_slice(), |row| row.get(0))?;
329
330 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter().map(|b| b.as_ref()).collect();
332 let mut stmt = conn.prepare(&sql)?;
333
334 let tasks = stmt
335 .query_map(params_refs.as_slice(), |row| {
336 Ok(TaskListItem {
337 id: row.get(0)?,
338 title: row.get(1)?,
339 status: row.get(2)?,
340 priority: row.get(3)?,
341 worker_id: row.get(4)?,
342 tags: row.get::<_, Option<String>>(5)?.unwrap_or_default(),
343 created_at: row.get(6)?,
344 updated_at: row.get(7)?,
345 })
346 })?
347 .filter_map(|r| r.ok())
348 .collect();
349
350 let total_pages = ((total as f64) / (query.limit as f64)).ceil() as i32;
351
352 Ok(TaskListResult {
353 tasks,
354 total,
355 page: query.page,
356 limit: query.limit,
357 total_pages,
358 })
359 })
360 }
361
362 pub fn get_worker_claimed_tasks(&self, worker_id: &str) -> Result<Vec<WorkerClaimedTask>> {
364 self.with_conn(|conn| {
365 let mut stmt = conn.prepare(
366 "SELECT id, title, status, current_thought
367 FROM tasks
368 WHERE worker_id = ?1 AND status = 'working' AND deleted_at IS NULL
369 ORDER BY claimed_at DESC",
370 )?;
371
372 let tasks = stmt
373 .query_map(params![worker_id], |row| {
374 let id: String = row.get(0)?;
375 let title: Option<String> = row.get(1)?;
376 let status: String = row.get(2)?;
377 let current_thought: Option<String> = row.get(3)?;
378 Ok(WorkerClaimedTask {
379 id,
380 title,
381 status,
382 current_thought,
383 })
384 })?
385 .filter_map(|r| r.ok())
386 .collect();
387
388 Ok(tasks)
389 })
390 }
391
392 pub fn dashboard_update_task(
395 &self,
396 task_id: &str,
397 status: Option<&str>,
398 priority: Option<i32>,
399 description: Option<&str>,
400 tags: Option<Vec<String>>,
401 ) -> Result<()> {
402 let now = super::now_ms();
403
404 self.with_conn(|conn| {
405 let mut updates = vec!["updated_at = ?1".to_string()];
407 let mut param_idx = 2;
408
409 if status.is_some() {
410 updates.push(format!("status = ?{}", param_idx));
411 param_idx += 1;
412 }
413 if priority.is_some() {
414 updates.push(format!("priority = ?{}", param_idx));
415 param_idx += 1;
416 }
417 if description.is_some() {
418 updates.push(format!("description = ?{}", param_idx));
419 param_idx += 1;
420 }
421 if tags.is_some() {
422 updates.push(format!("tags = ?{}", param_idx));
423 param_idx += 1;
424 }
425
426 let sql = format!(
427 "UPDATE tasks SET {} WHERE id = ?{}",
428 updates.join(", "),
429 param_idx
430 );
431
432 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
434 params_vec.push(Box::new(now));
435
436 if let Some(s) = status {
437 params_vec.push(Box::new(s.to_string()));
438 }
439 if let Some(p) = priority {
440 params_vec.push(Box::new(p));
441 }
442 if let Some(d) = description {
443 params_vec.push(Box::new(d.to_string()));
444 }
445 if let Some(t) = tags {
446 params_vec.push(Box::new(serde_json::to_string(&t)?));
447 }
448 params_vec.push(Box::new(task_id.to_string()));
449
450 let params_refs: Vec<&dyn rusqlite::ToSql> =
451 params_vec.iter().map(|b| b.as_ref()).collect();
452 let rows_affected = conn.execute(&sql, params_refs.as_slice())?;
453
454 if rows_affected == 0 {
455 return Err(anyhow::anyhow!("Task not found"));
456 }
457
458 Ok(())
459 })
460 }
461
462 pub fn dashboard_delete_task(&self, task_id: &str) -> Result<()> {
464 let now = super::now_ms();
465
466 self.with_conn(|conn| {
467 let child_count: i32 = conn.query_row(
469 "SELECT COUNT(*) FROM dependencies WHERE from_task_id = ?1 AND dep_type = 'contains'",
470 params![task_id],
471 |row| row.get(0),
472 )?;
473
474 if child_count > 0 {
475 return Err(anyhow::anyhow!("Task has children; delete them first"));
476 }
477
478 let rows_affected = conn.execute(
479 "UPDATE tasks SET deleted_at = ?1, updated_at = ?1 WHERE id = ?2 AND deleted_at IS NULL",
480 params![now, task_id],
481 )?;
482
483 if rows_affected == 0 {
484 return Err(anyhow::anyhow!("Task not found or already deleted"));
485 }
486
487 Ok(())
488 })
489 }
490
491 pub fn dashboard_force_release_task(&self, task_id: &str) -> Result<()> {
494 let now = super::now_ms();
495
496 self.with_conn(|conn| {
497 let rows_affected = conn.execute(
498 "UPDATE tasks SET
499 status = 'pending',
500 worker_id = NULL,
501 claimed_at = NULL,
502 current_thought = NULL,
503 updated_at = ?1
504 WHERE id = ?2 AND deleted_at IS NULL",
505 params![now, task_id],
506 )?;
507
508 if rows_affected == 0 {
509 return Err(anyhow::anyhow!("Task not found or already deleted"));
510 }
511
512 Ok(())
513 })
514 }
515
516 pub fn get_activity_stats(&self) -> Result<ActivityStats> {
518 let now = super::now_ms();
519 let cutoff_24h = now - (24 * 60 * 60 * 1000);
520
521 self.with_conn(|conn| {
522 let transitions_24h: i64 = conn.query_row(
524 "SELECT COUNT(*) FROM task_sequence WHERE timestamp >= ?1",
525 params![cutoff_24h],
526 |row| row.get(0),
527 )?;
528
529 let file_events_24h: i64 = conn.query_row(
531 "SELECT COUNT(*) FROM claim_sequence WHERE timestamp >= ?1",
532 params![cutoff_24h],
533 |row| row.get(0),
534 )?;
535
536 let total_events_24h = transitions_24h + file_events_24h;
537
538 let worker_cutoff = now - (5 * 60 * 1000);
540 let active_workers: i64 = conn.query_row(
541 "SELECT COUNT(*) FROM workers WHERE last_heartbeat >= ?1",
542 params![worker_cutoff],
543 |row| row.get(0),
544 )?;
545
546 let mut events_by_status = HashMap::new();
548 let mut stmt = conn.prepare(
549 "SELECT status, COUNT(*) FROM task_sequence
550 WHERE timestamp >= ?1 AND status IS NOT NULL GROUP BY status",
551 )?;
552 let mut rows = stmt.query(params![cutoff_24h])?;
553 while let Some(row) = rows.next()? {
554 let status: String = row.get(0)?;
555 let count: i64 = row.get(1)?;
556 events_by_status.insert(status, count);
557 }
558
559 Ok(ActivityStats {
560 total_events_24h,
561 transitions_24h,
562 file_events_24h,
563 active_workers,
564 events_by_status,
565 })
566 })
567 }
568
569 pub fn query_activity(&self, query: &ActivityListQuery) -> Result<ActivityListResult> {
571 self.with_conn(|conn| {
572 let mut events = Vec::new();
576 let mut total: i64 = 0;
577
578 let include_transitions =
580 query.event_type.is_none() || query.event_type.as_deref() == Some("transition");
581 let include_files =
582 query.event_type.is_none() || query.event_type.as_deref() == Some("file");
583
584 if include_transitions {
586 let mut sql = String::from(
587 "SELECT id, task_id, worker_id, status, reason, timestamp
588 FROM task_sequence WHERE status IS NOT NULL",
589 );
590 let mut count_sql =
591 String::from("SELECT COUNT(*) FROM task_sequence WHERE status IS NOT NULL");
592 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
593 let mut param_idx = 1;
594
595 if let Some(ref status) = query.status {
597 if !status.is_empty() {
598 sql.push_str(&format!(" AND status = ?{}", param_idx));
599 count_sql.push_str(&format!(" AND status = ?{}", param_idx));
600 params_vec.push(Box::new(status.clone()));
601 param_idx += 1;
602 }
603 }
604
605 if let Some(ref worker) = query.worker {
607 if !worker.is_empty() {
608 sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
609 count_sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
610 params_vec.push(Box::new(worker.clone()));
611 param_idx += 1;
612 }
613 }
614
615 if let Some(ref task) = query.task {
617 if !task.is_empty() {
618 sql.push_str(&format!(" AND task_id LIKE '%' || ?{} || '%'", param_idx));
619 count_sql
620 .push_str(&format!(" AND task_id LIKE '%' || ?{} || '%'", param_idx));
621 params_vec.push(Box::new(task.clone()));
622 let _ = param_idx; }
624 }
625
626 sql.push_str(" ORDER BY timestamp DESC");
627
628 let params_refs: Vec<&dyn rusqlite::ToSql> =
630 params_vec.iter().map(|b| b.as_ref()).collect();
631 let trans_count: i64 =
632 conn.query_row(&count_sql, params_refs.as_slice(), |row| row.get(0))?;
633 total += trans_count;
634
635 if !include_files {
637 let offset = (query.page - 1) * query.limit;
638 sql.push_str(&format!(" LIMIT {} OFFSET {}", query.limit, offset));
639 }
640
641 let params_refs: Vec<&dyn rusqlite::ToSql> =
642 params_vec.iter().map(|b| b.as_ref()).collect();
643 let mut stmt = conn.prepare(&sql)?;
644 let mut rows = stmt.query(params_refs.as_slice())?;
645
646 while let Some(row) = rows.next()? {
647 let id: i64 = row.get(0)?;
648 let task_id: String = row.get(1)?;
649 let worker_id: Option<String> = row.get(2)?;
650 let event: String = row.get(3)?;
651 let reason: Option<String> = row.get(4)?;
652 let timestamp: i64 = row.get(5)?;
653
654 events.push(ActivityEvent {
655 id,
656 event_type: ActivityEventType::TaskTransition,
657 timestamp,
658 worker_id,
659 task_id: Some(task_id),
660 file_path: None,
661 from_status: None,
662 to_status: Some(event),
663 reason,
664 });
665 }
666 }
667
668 if include_files && query.status.is_none() {
670 let mut sql = String::from(
671 "SELECT id, file_path, worker_id, event, reason, timestamp
672 FROM claim_sequence WHERE 1=1",
673 );
674 let mut count_sql = String::from("SELECT COUNT(*) FROM claim_sequence WHERE 1=1");
675 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
676 let param_idx = 1;
677
678 if let Some(ref worker) = query.worker {
680 if !worker.is_empty() {
681 sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
682 count_sql.push_str(&format!(" AND worker_id = ?{}", param_idx));
683 params_vec.push(Box::new(worker.clone()));
684 }
685 }
686
687 sql.push_str(" ORDER BY timestamp DESC");
691
692 let params_refs: Vec<&dyn rusqlite::ToSql> =
694 params_vec.iter().map(|b| b.as_ref()).collect();
695 let file_count: i64 =
696 conn.query_row(&count_sql, params_refs.as_slice(), |row| row.get(0))?;
697 total += file_count;
698
699 if query.task.is_none() || query.task.as_ref().map(|t| t.is_empty()).unwrap_or(true)
701 {
702 let params_refs: Vec<&dyn rusqlite::ToSql> =
703 params_vec.iter().map(|b| b.as_ref()).collect();
704 let mut stmt = conn.prepare(&sql)?;
705 let mut rows = stmt.query(params_refs.as_slice())?;
706
707 while let Some(row) = rows.next()? {
708 let id: i64 = row.get(0)?;
709 let file_path: String = row.get(1)?;
710 let worker_id: String = row.get(2)?;
711 let event: String = row.get(3)?;
712 let reason: Option<String> = row.get(4)?;
713 let timestamp: i64 = row.get(5)?;
714
715 let event_type = if event == "claimed" {
716 ActivityEventType::FileClaim
717 } else {
718 ActivityEventType::FileRelease
719 };
720
721 events.push(ActivityEvent {
722 id: id + 1_000_000_000, event_type,
724 timestamp,
725 worker_id: Some(worker_id),
726 task_id: None,
727 file_path: Some(file_path),
728 from_status: None,
729 to_status: None,
730 reason,
731 });
732 }
733 }
734 }
735
736 events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
738
739 let offset = ((query.page - 1) * query.limit) as usize;
741 let limit = query.limit as usize;
742 let paginated_events: Vec<ActivityEvent> =
743 events.into_iter().skip(offset).take(limit).collect();
744
745 let total_pages = ((total as f64) / (query.limit as f64)).ceil() as i32;
746
747 Ok(ActivityListResult {
748 events: paginated_events,
749 total,
750 page: query.page,
751 limit: query.limit,
752 total_pages: total_pages.max(1),
753 })
754 })
755 }
756
757 pub fn get_all_file_marks(&self) -> Result<Vec<DashboardFileMark>> {
759 self.with_conn(|conn| {
760 let mut stmt = conn.prepare(
761 "SELECT file_path, worker_id, reason, locked_at, task_id
762 FROM file_locks
763 ORDER BY locked_at DESC",
764 )?;
765
766 let marks = stmt
767 .query_map([], |row| {
768 Ok(DashboardFileMark {
769 file_path: row.get(0)?,
770 worker_id: row.get(1)?,
771 reason: row.get(2)?,
772 locked_at: row.get(3)?,
773 task_id: row.get(4)?,
774 })
775 })?
776 .filter_map(|r| r.ok())
777 .collect();
778
779 Ok(marks)
780 })
781 }
782
783 pub fn get_file_marks_stats(&self) -> Result<FileMarksStats> {
785 self.with_conn(|conn| {
786 let total_marks: i64 =
787 conn.query_row("SELECT COUNT(*) FROM file_locks", [], |row| row.get(0))?;
788
789 let unique_agents: i64 = conn.query_row(
790 "SELECT COUNT(DISTINCT worker_id) FROM file_locks",
791 [],
792 |row| row.get(0),
793 )?;
794
795 let with_tasks: i64 = conn.query_row(
796 "SELECT COUNT(*) FROM file_locks WHERE task_id IS NOT NULL",
797 [],
798 |row| row.get(0),
799 )?;
800
801 let now = super::now_ms();
803 let stale_cutoff = now - (60 * 60 * 1000); let stale_marks: i64 = conn.query_row(
805 "SELECT COUNT(*) FROM file_locks WHERE locked_at < ?1",
806 params![stale_cutoff],
807 |row| row.get(0),
808 )?;
809
810 Ok(FileMarksStats {
811 total_marks,
812 unique_agents,
813 with_tasks,
814 stale_marks,
815 })
816 })
817 }
818
819 pub fn force_unmark_file(&self, file_path: &str) -> Result<bool> {
822 let now = super::now_ms();
823
824 self.with_conn_mut(|conn| {
825 let tx = conn.transaction()?;
826
827 let owner: Option<String> = tx.query_row(
829 "SELECT worker_id FROM file_locks WHERE file_path = ?1",
830 params![file_path],
831 |row| row.get(0),
832 ).ok();
833
834 let deleted = tx.execute(
835 "DELETE FROM file_locks WHERE file_path = ?1",
836 params![file_path],
837 )?;
838
839 if deleted > 0 {
840 if let Some(worker_id) = owner {
841 let claim_id: Option<i64> = tx.query_row(
843 "SELECT MAX(id) FROM claim_sequence
844 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
845 params![file_path, &worker_id],
846 |row| row.get(0),
847 ).ok().flatten();
848
849 tx.execute(
851 "UPDATE claim_sequence SET end_timestamp = ?1
852 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
853 params![now, file_path, &worker_id],
854 )?;
855
856 tx.execute(
858 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
859 VALUES (?1, ?2, 'released', 'Force-unmarked via dashboard', ?3, ?4)",
860 params![file_path, &worker_id, now, claim_id],
861 )?;
862 }
863 }
864
865 tx.commit()?;
866 Ok(deleted > 0)
867 })
868 }
869
870 pub fn get_metrics_overview(&self) -> Result<MetricsOverview> {
874 self.with_conn(|conn| {
875 let row: (i64, i64, f64, i64, i64, i64) = conn.query_row(
876 "SELECT
877 COUNT(*) as total_tasks,
878 SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
879 COALESCE(SUM(cost_usd), 0.0) as total_cost,
880 COALESCE(SUM(time_actual_ms), 0) as total_time,
881 COALESCE(SUM(points), 0) as total_points,
882 COALESCE(SUM(CASE WHEN status = 'completed' THEN points ELSE 0 END), 0) as completed_points
883 FROM tasks
884 WHERE deleted_at IS NULL",
885 [],
886 |row| Ok((
887 row.get(0)?,
888 row.get(1)?,
889 row.get(2)?,
890 row.get(3)?,
891 row.get(4)?,
892 row.get(5)?,
893 )),
894 )?;
895
896 Ok(MetricsOverview {
897 total_tasks: row.0,
898 completed_tasks: row.1,
899 total_cost_usd: row.2,
900 total_time_ms: row.3,
901 total_points: row.4,
902 completed_points: row.5,
903 })
904 })
905 }
906
907 pub fn get_status_distribution(&self) -> Result<HashMap<String, i64>> {
909 self.with_conn(|conn| {
910 let mut stmt = conn.prepare(
911 "SELECT status, COUNT(*) as count
912 FROM tasks
913 WHERE deleted_at IS NULL
914 GROUP BY status",
915 )?;
916
917 let mut distribution = HashMap::new();
918 let rows = stmt.query_map([], |row| {
919 let status: String = row.get(0)?;
920 let count: i64 = row.get(1)?;
921 Ok((status, count))
922 })?;
923
924 for row in rows {
925 let (status, count) = row?;
926 distribution.insert(status, count);
927 }
928
929 Ok(distribution)
930 })
931 }
932
933 pub fn get_velocity(&self, period: &str, num_periods: i32) -> Result<Vec<VelocityDataPoint>> {
936 self.with_conn(|conn| {
937 let now = super::now_ms();
938 let period_ms: i64 = match period {
939 "week" => 7 * 24 * 60 * 60 * 1000,
940 _ => 24 * 60 * 60 * 1000, };
942
943 let mut data_points = Vec::new();
944
945 for i in 0..num_periods {
946 let period_end = now - (i as i64 * period_ms);
947 let period_start = period_end - period_ms;
948
949 let (count, points): (i64, i64) = conn.query_row(
950 "SELECT COUNT(*), COALESCE(SUM(points), 0)
951 FROM tasks
952 WHERE deleted_at IS NULL
953 AND status = 'completed'
954 AND completed_at >= ?1
955 AND completed_at < ?2",
956 params![period_start, period_end],
957 |row| Ok((row.get(0)?, row.get(1)?)),
958 )?;
959
960 let label = if period == "week" {
962 if i == 0 {
963 "This week".to_string()
964 } else if i == 1 {
965 "Last week".to_string()
966 } else {
967 format!("{} weeks ago", i)
968 }
969 } else {
970 if i == 0 {
971 "Today".to_string()
972 } else if i == 1 {
973 "Yesterday".to_string()
974 } else {
975 format!("{} days ago", i)
976 }
977 };
978
979 data_points.push(VelocityDataPoint {
980 period_label: label,
981 completed_count: count,
982 total_points: points,
983 });
984 }
985
986 data_points.reverse();
988 Ok(data_points)
989 })
990 }
991
992 pub fn get_time_in_status(&self) -> Result<Vec<TimeInStatusStats>> {
994 self.with_conn(|conn| {
995 let mut stmt = conn.prepare(
996 "SELECT
997 status,
998 AVG(COALESCE(end_timestamp, ?1) - timestamp) as avg_duration,
999 SUM(COALESCE(end_timestamp, ?1) - timestamp) as total_duration,
1000 COUNT(*) as transition_count
1001 FROM task_sequence
1002 WHERE status IS NOT NULL
1003 GROUP BY status
1004 ORDER BY avg_duration DESC",
1005 )?;
1006
1007 let now = super::now_ms();
1008 let stats = stmt
1009 .query_map(params![now], |row| {
1010 Ok(TimeInStatusStats {
1011 status: row.get(0)?,
1012 avg_duration_ms: row.get::<_, f64>(1)? as i64,
1013 total_duration_ms: row.get::<_, f64>(2)? as i64,
1014 transition_count: row.get(3)?,
1015 })
1016 })?
1017 .filter_map(|r| r.ok())
1018 .collect();
1019
1020 Ok(stats)
1021 })
1022 }
1023
1024 pub fn get_cost_by_agent(&self) -> Result<Vec<AgentCostStats>> {
1026 self.with_conn(|conn| {
1027 let mut stmt = conn.prepare(
1028 "SELECT
1029 worker_id,
1030 COALESCE(SUM(cost_usd), 0.0) as total_cost,
1031 COUNT(*) as task_count,
1032 SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_count,
1033 COALESCE(SUM(time_actual_ms), 0) as total_time
1034 FROM tasks
1035 WHERE deleted_at IS NULL AND worker_id IS NOT NULL
1036 GROUP BY worker_id
1037 ORDER BY total_cost DESC",
1038 )?;
1039
1040 let stats = stmt
1041 .query_map([], |row| {
1042 Ok(AgentCostStats {
1043 worker_id: row.get(0)?,
1044 total_cost_usd: row.get(1)?,
1045 task_count: row.get(2)?,
1046 completed_count: row.get(3)?,
1047 total_time_ms: row.get(4)?,
1048 })
1049 })?
1050 .filter_map(|r| r.ok())
1051 .collect();
1052
1053 Ok(stats)
1054 })
1055 }
1056
1057 pub fn get_custom_metrics(&self) -> Result<CustomMetricsAggregate> {
1059 self.with_conn(|conn| {
1060 let row: (i64, i64, i64, i64, i64, i64, i64, i64) = conn.query_row(
1061 "SELECT
1062 COALESCE(SUM(metric_0), 0),
1063 COALESCE(SUM(metric_1), 0),
1064 COALESCE(SUM(metric_2), 0),
1065 COALESCE(SUM(metric_3), 0),
1066 COALESCE(SUM(metric_4), 0),
1067 COALESCE(SUM(metric_5), 0),
1068 COALESCE(SUM(metric_6), 0),
1069 COALESCE(SUM(metric_7), 0)
1070 FROM tasks
1071 WHERE deleted_at IS NULL",
1072 [],
1073 |row| {
1074 Ok((
1075 row.get(0)?,
1076 row.get(1)?,
1077 row.get(2)?,
1078 row.get(3)?,
1079 row.get(4)?,
1080 row.get(5)?,
1081 row.get(6)?,
1082 row.get(7)?,
1083 ))
1084 },
1085 )?;
1086
1087 Ok(CustomMetricsAggregate {
1088 metrics: [row.0, row.1, row.2, row.3, row.4, row.5, row.6, row.7],
1089 })
1090 })
1091 }
1092
1093 pub fn get_dependency_graph(
1098 &self,
1099 dep_type: Option<&str>,
1100 focus_task: Option<&str>,
1101 depth: i32,
1102 ) -> Result<DependencyGraph> {
1103 self.with_conn(|conn| {
1104 let mut nodes: Vec<GraphNode> = Vec::new();
1105 let mut edges: Vec<GraphEdge> = Vec::new();
1106 let mut seen_tasks: std::collections::HashSet<String> = std::collections::HashSet::new();
1107
1108 let type_clause = match dep_type {
1110 Some("blocks") => "AND d.dep_type = 'blocks'",
1111 Some("follows") => "AND d.dep_type = 'follows'",
1112 Some("contains") => "AND d.dep_type = 'contains'",
1113 _ => "", };
1115
1116 if let Some(focus_id) = focus_task {
1117 let actual_depth = if depth < 0 { 100 } else { depth };
1119
1120 if let Ok(task) = conn.query_row(
1122 "SELECT id, title, status, priority FROM tasks WHERE id = ?1 AND deleted_at IS NULL",
1123 params![focus_id],
1124 |row| Ok(GraphNode {
1125 id: row.get(0)?,
1126 title: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
1127 status: row.get(2)?,
1128 priority: row.get(3)?,
1129 }),
1130 ) {
1131 seen_tasks.insert(task.id.clone());
1132 nodes.push(task);
1133 }
1134
1135 let mut current_level: Vec<String> = vec![focus_id.to_string()];
1137 for _ in 0..actual_depth {
1138 if current_level.is_empty() { break; }
1139 let mut next_level: Vec<String> = Vec::new();
1140
1141 for tid in ¤t_level {
1142 let sql = format!(
1143 "SELECT d.from_task_id, d.dep_type, t.id, t.title, t.status, t.priority
1144 FROM dependencies d
1145 JOIN tasks t ON d.from_task_id = t.id
1146 WHERE d.to_task_id = ?1 AND t.deleted_at IS NULL {}",
1147 type_clause
1148 );
1149
1150 let mut stmt = conn.prepare(&sql)?;
1151 let rows = stmt.query_map(params![tid], |row| {
1152 Ok((
1153 row.get::<_, String>(0)?,
1154 row.get::<_, String>(1)?,
1155 row.get::<_, String>(2)?,
1156 row.get::<_, Option<String>>(3)?,
1157 row.get::<_, String>(4)?,
1158 row.get::<_, i32>(5)?,
1159 ))
1160 })?;
1161
1162 for row in rows.flatten() {
1163 let (from_id, dep_type_str, task_id, title, status, priority) = row;
1164
1165 edges.push(GraphEdge {
1166 from_id: from_id.clone(),
1167 to_id: tid.clone(),
1168 dep_type: dep_type_str,
1169 });
1170
1171 if !seen_tasks.contains(&task_id) {
1172 seen_tasks.insert(task_id.clone());
1173 nodes.push(GraphNode {
1174 id: task_id.clone(),
1175 title: title.unwrap_or_default(),
1176 status,
1177 priority,
1178 });
1179 next_level.push(task_id);
1180 }
1181 }
1182 }
1183 current_level = next_level;
1184 }
1185
1186 let mut current_level: Vec<String> = vec![focus_id.to_string()];
1188 for _ in 0..actual_depth {
1189 if current_level.is_empty() { break; }
1190 let mut next_level: Vec<String> = Vec::new();
1191
1192 for tid in ¤t_level {
1193 let sql = format!(
1194 "SELECT d.to_task_id, d.dep_type, t.id, t.title, t.status, t.priority
1195 FROM dependencies d
1196 JOIN tasks t ON d.to_task_id = t.id
1197 WHERE d.from_task_id = ?1 AND t.deleted_at IS NULL {}",
1198 type_clause
1199 );
1200
1201 let mut stmt = conn.prepare(&sql)?;
1202 let rows = stmt.query_map(params![tid], |row| {
1203 Ok((
1204 row.get::<_, String>(0)?,
1205 row.get::<_, String>(1)?,
1206 row.get::<_, String>(2)?,
1207 row.get::<_, Option<String>>(3)?,
1208 row.get::<_, String>(4)?,
1209 row.get::<_, i32>(5)?,
1210 ))
1211 })?;
1212
1213 for row in rows.flatten() {
1214 let (to_id, dep_type_str, task_id, title, status, priority) = row;
1215
1216 edges.push(GraphEdge {
1217 from_id: tid.clone(),
1218 to_id: to_id.clone(),
1219 dep_type: dep_type_str,
1220 });
1221
1222 if !seen_tasks.contains(&task_id) {
1223 seen_tasks.insert(task_id.clone());
1224 nodes.push(GraphNode {
1225 id: task_id.clone(),
1226 title: title.unwrap_or_default(),
1227 status,
1228 priority,
1229 });
1230 next_level.push(task_id);
1231 }
1232 }
1233 }
1234 current_level = next_level;
1235 }
1236 } else {
1237 let sql = format!(
1239 "SELECT d.from_task_id, d.to_task_id, d.dep_type
1240 FROM dependencies d
1241 JOIN tasks t1 ON d.from_task_id = t1.id
1242 JOIN tasks t2 ON d.to_task_id = t2.id
1243 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL {}",
1244 type_clause
1245 );
1246
1247 let mut stmt = conn.prepare(&sql)?;
1248 let edge_rows = stmt.query_map([], |row| {
1249 Ok(GraphEdge {
1250 from_id: row.get(0)?,
1251 to_id: row.get(1)?,
1252 dep_type: row.get(2)?,
1253 })
1254 })?;
1255
1256 for edge in edge_rows.flatten() {
1257 seen_tasks.insert(edge.from_id.clone());
1258 seen_tasks.insert(edge.to_id.clone());
1259 edges.push(edge);
1260 }
1261
1262 if !seen_tasks.is_empty() {
1264 let placeholders: String = seen_tasks.iter()
1265 .enumerate()
1266 .map(|(i, _)| format!("?{}", i + 1))
1267 .collect::<Vec<_>>()
1268 .join(", ");
1269
1270 let node_sql = format!(
1271 "SELECT id, title, status, priority FROM tasks
1272 WHERE id IN ({}) AND deleted_at IS NULL",
1273 placeholders
1274 );
1275
1276 let mut stmt = conn.prepare(&node_sql)?;
1277 let params_vec: Vec<String> = seen_tasks.iter().cloned().collect();
1278 let params_refs: Vec<&dyn rusqlite::ToSql> = params_vec
1279 .iter()
1280 .map(|s| s as &dyn rusqlite::ToSql)
1281 .collect();
1282
1283 let node_rows = stmt.query_map(params_refs.as_slice(), |row| {
1284 Ok(GraphNode {
1285 id: row.get(0)?,
1286 title: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
1287 status: row.get(2)?,
1288 priority: row.get(3)?,
1289 })
1290 })?;
1291
1292 for node in node_rows.flatten() {
1293 nodes.push(node);
1294 }
1295 }
1296 }
1297
1298 Ok(DependencyGraph { nodes, edges })
1299 })
1300 }
1301
1302 pub fn get_dependency_graph_stats(&self) -> Result<DependencyGraphStats> {
1304 self.with_conn(|conn| {
1305 let total_tasks: i64 = conn.query_row(
1306 "SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL",
1307 [],
1308 |row| row.get(0),
1309 )?;
1310
1311 let total_deps: i64 = conn.query_row(
1312 "SELECT COUNT(*) FROM dependencies d
1313 JOIN tasks t1 ON d.from_task_id = t1.id
1314 JOIN tasks t2 ON d.to_task_id = t2.id
1315 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL",
1316 [],
1317 |row| row.get(0),
1318 )?;
1319
1320 let blocks_count: i64 = conn.query_row(
1321 "SELECT COUNT(*) FROM dependencies d
1322 JOIN tasks t1 ON d.from_task_id = t1.id
1323 JOIN tasks t2 ON d.to_task_id = t2.id
1324 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND d.dep_type = 'blocks'",
1325 [],
1326 |row| row.get(0),
1327 )?;
1328
1329 let follows_count: i64 = conn.query_row(
1330 "SELECT COUNT(*) FROM dependencies d
1331 JOIN tasks t1 ON d.from_task_id = t1.id
1332 JOIN tasks t2 ON d.to_task_id = t2.id
1333 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND d.dep_type = 'follows'",
1334 [],
1335 |row| row.get(0),
1336 )?;
1337
1338 let contains_count: i64 = conn.query_row(
1339 "SELECT COUNT(*) FROM dependencies d
1340 JOIN tasks t1 ON d.from_task_id = t1.id
1341 JOIN tasks t2 ON d.to_task_id = t2.id
1342 WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND d.dep_type = 'contains'",
1343 [],
1344 |row| row.get(0),
1345 )?;
1346
1347 Ok(DependencyGraphStats {
1348 total_tasks,
1349 total_deps,
1350 blocks_count,
1351 follows_count,
1352 contains_count,
1353 })
1354 })
1355 }
1356
1357 pub fn get_available_phases(&self) -> Result<Vec<String>> {
1359 self.with_conn(|conn| {
1360 let mut stmt = conn.prepare(
1361 "SELECT DISTINCT phase FROM tasks WHERE phase IS NOT NULL AND deleted_at IS NULL ORDER BY phase"
1362 )?;
1363
1364 let phases = stmt
1365 .query_map([], |row| row.get(0))?
1366 .filter_map(|r| r.ok())
1367 .collect();
1368
1369 Ok(phases)
1370 })
1371 }
1372}
1373
1374#[derive(Debug, Clone)]
1376pub struct DashboardFileMark {
1377 pub file_path: String,
1378 pub worker_id: String,
1379 pub reason: Option<String>,
1380 pub locked_at: i64,
1381 pub task_id: Option<String>,
1382}
1383
1384#[derive(Debug, Clone)]
1386pub struct FileMarksStats {
1387 pub total_marks: i64,
1388 pub unique_agents: i64,
1389 pub with_tasks: i64,
1390 pub stale_marks: i64,
1391}
1392
1393#[derive(Debug, Clone)]
1397pub struct MetricsOverview {
1398 pub total_tasks: i64,
1399 pub completed_tasks: i64,
1400 pub total_cost_usd: f64,
1401 pub total_time_ms: i64,
1402 pub total_points: i64,
1403 pub completed_points: i64,
1404}
1405
1406#[derive(Debug, Clone)]
1408pub struct VelocityDataPoint {
1409 pub period_label: String,
1410 pub completed_count: i64,
1411 pub total_points: i64,
1412}
1413
1414#[derive(Debug, Clone)]
1416pub struct TimeInStatusStats {
1417 pub status: String,
1418 pub avg_duration_ms: i64,
1419 pub total_duration_ms: i64,
1420 pub transition_count: i64,
1421}
1422
1423#[derive(Debug, Clone)]
1425pub struct AgentCostStats {
1426 pub worker_id: String,
1427 pub total_cost_usd: f64,
1428 pub task_count: i64,
1429 pub completed_count: i64,
1430 pub total_time_ms: i64,
1431}
1432
1433#[derive(Debug, Clone)]
1435pub struct CustomMetricsAggregate {
1436 pub metrics: [i64; 8],
1437}
1438
1439#[derive(Debug, Clone, serde::Serialize)]
1443pub struct GraphNode {
1444 pub id: String,
1445 pub title: String,
1446 pub status: String,
1447 pub priority: i32,
1448}
1449
1450#[derive(Debug, Clone, serde::Serialize)]
1452pub struct GraphEdge {
1453 pub from_id: String,
1454 pub to_id: String,
1455 pub dep_type: String,
1456}
1457
1458#[derive(Debug, Clone, serde::Serialize)]
1460pub struct DependencyGraph {
1461 pub nodes: Vec<GraphNode>,
1462 pub edges: Vec<GraphEdge>,
1463}
1464
1465#[derive(Debug, Clone, serde::Serialize)]
1467pub struct DependencyGraphStats {
1468 pub total_tasks: i64,
1469 pub total_deps: i64,
1470 pub blocks_count: i64,
1471 pub follows_count: i64,
1472 pub contains_count: i64,
1473}