1use anyhow::Result;
2use rusqlite::{Connection, params};
3use std::path::Path;
4
5pub use crate::persistence::sqlite::SQLITE_BUSY_TIMEOUT_MS;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub struct ProjectResetStats {
10 pub tasks: u64,
12 pub task_items: u64,
14 pub command_runs: u64,
16 pub events: u64,
18 pub tickets_cleaned: u64,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct TaskReference {
25 pub task_id: String,
27 pub status: String,
29}
30
31#[derive(Debug, Clone)]
33pub struct TaskExecutionMetric {
34 pub task_id: String,
36 pub status: String,
38 pub current_cycle: u32,
40 pub unresolved_items: i64,
42 pub total_items: i64,
44 pub failed_items: i64,
46 pub command_runs: i64,
48 pub created_at: String,
50}
51
52#[derive(Debug, Clone)]
54pub struct ControlPlaneAuditRecord {
55 pub transport: String,
57 pub remote_addr: Option<String>,
59 pub rpc: String,
61 pub subject_id: Option<String>,
63 pub authn_result: String,
65 pub authz_result: String,
67 pub role: Option<String>,
69 pub reason: Option<String>,
71 pub tls_fingerprint: Option<String>,
73 pub rejection_stage: Option<String>,
75 pub traffic_class: Option<String>,
77 pub limit_scope: Option<String>,
79 pub decision: Option<String>,
81 pub reason_code: Option<String>,
83}
84
85#[derive(Debug, Clone)]
87pub struct PluginAuditRecord {
88 pub action: String,
90 pub crd_kind: String,
92 pub plugin_name: Option<String>,
94 pub plugin_type: Option<String>,
96 pub command: String,
98 pub applied_by: Option<String>,
100 pub transport: Option<String>,
102 pub peer_pid: Option<i32>,
104 pub result: String,
106 pub policy_mode: Option<String>,
108}
109
110pub fn insert_plugin_audit(db_path: &Path, record: &PluginAuditRecord) -> Result<()> {
112 let conn = open_conn(db_path)?;
113 conn.execute(
114 "INSERT INTO plugin_audit (
115 created_at, action, crd_kind, plugin_name, plugin_type,
116 command, applied_by, transport, peer_pid, result, policy_mode
117 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
118 params![
119 crate::config_load::now_ts(),
120 record.action,
121 record.crd_kind,
122 record.plugin_name,
123 record.plugin_type,
124 record.command,
125 record.applied_by,
126 record.transport,
127 record.peer_pid,
128 record.result,
129 record.policy_mode,
130 ],
131 )?;
132 Ok(())
133}
134
135pub fn open_conn(db_path: &Path) -> Result<Connection> {
137 crate::persistence::sqlite::open_conn(db_path)
138}
139
140pub fn configure_conn(conn: &Connection) -> Result<()> {
142 crate::persistence::sqlite::configure_conn(conn)
143}
144
145pub fn init_schema(db_path: &Path) -> Result<()> {
147 crate::persistence::schema::PersistenceBootstrap::ensure_current(db_path)?;
148 Ok(())
149}
150
151pub fn count_non_terminal_tasks_by_workspace(
153 conn: &Connection,
154 project_id: &str,
155 workspace_id: &str,
156) -> Result<i64> {
157 let count: i64 = conn.query_row(
158 "SELECT COUNT(*) FROM tasks
159 WHERE project_id = ?1
160 AND workspace_id = ?2
161 AND status IN ('created', 'pending', 'running', 'restart_pending')",
162 params![project_id, workspace_id],
163 |row| row.get(0),
164 )?;
165 Ok(count)
166}
167
168pub fn count_non_terminal_tasks_by_workflow(
170 conn: &Connection,
171 project_id: &str,
172 workflow_id: &str,
173) -> Result<i64> {
174 let count: i64 = conn.query_row(
175 "SELECT COUNT(*) FROM tasks
176 WHERE project_id = ?1
177 AND workflow_id = ?2
178 AND status IN ('created', 'pending', 'running', 'restart_pending')",
179 params![project_id, workflow_id],
180 |row| row.get(0),
181 )?;
182 Ok(count)
183}
184
185pub fn list_non_terminal_tasks_by_workspace(
187 conn: &Connection,
188 project_id: &str,
189 workspace_id: &str,
190 limit: usize,
191) -> Result<Vec<TaskReference>> {
192 let mut stmt = conn.prepare(
193 "SELECT id, status FROM tasks
194 WHERE project_id = ?1
195 AND workspace_id = ?2
196 AND status IN ('created', 'pending', 'running', 'restart_pending')
197 ORDER BY created_at ASC
198 LIMIT ?3",
199 )?;
200 let rows = stmt.query_map(params![project_id, workspace_id, limit], |row| {
201 Ok(TaskReference {
202 task_id: row.get(0)?,
203 status: row.get(1)?,
204 })
205 })?;
206 let mut tasks = Vec::new();
207 for row in rows {
208 tasks.push(row?);
209 }
210 Ok(tasks)
211}
212
213pub fn list_non_terminal_tasks_by_workflow(
215 conn: &Connection,
216 project_id: &str,
217 workflow_id: &str,
218 limit: usize,
219) -> Result<Vec<TaskReference>> {
220 let mut stmt = conn.prepare(
221 "SELECT id, status FROM tasks
222 WHERE project_id = ?1
223 AND workflow_id = ?2
224 AND status IN ('created', 'pending', 'running', 'restart_pending')
225 ORDER BY created_at ASC
226 LIMIT ?3",
227 )?;
228 let rows = stmt.query_map(params![project_id, workflow_id, limit], |row| {
229 Ok(TaskReference {
230 task_id: row.get(0)?,
231 status: row.get(1)?,
232 })
233 })?;
234 let mut tasks = Vec::new();
235 for row in rows {
236 tasks.push(row?);
237 }
238 Ok(tasks)
239}
240
241pub fn reset_db(
243 state: &crate::state::InnerState,
244 include_history: bool,
245 include_config: bool,
246) -> Result<()> {
247 reset_db_by_path(&state.db_path, include_history, include_config)
248}
249
250pub fn reset_db_by_path(db_path: &Path, include_history: bool, include_config: bool) -> Result<()> {
252 let conn = open_conn(db_path)?;
253
254 let active_count: i64 = conn
255 .query_row(
256 "SELECT COUNT(*) FROM tasks WHERE status IN ('running', 'restart_pending')",
257 [],
258 |row| row.get(0),
259 )
260 .unwrap_or(0);
261 if active_count > 0 {
262 anyhow::bail!(
263 "db reset blocked: {} active task(s) with status running/restart_pending. \
264 Use `project reset <project> --force` for project-scoped cleanup instead.",
265 active_count
266 );
267 }
268
269 conn.execute("DELETE FROM events", [])?;
270 let _ = conn.execute("DELETE FROM task_graph_snapshots", []);
271 let _ = conn.execute("DELETE FROM task_graph_runs", []);
272 conn.execute("DELETE FROM command_runs", [])?;
273 conn.execute("DELETE FROM task_items", [])?;
274 conn.execute("DELETE FROM tasks", [])?;
275 conn.execute("DELETE FROM task_execution_metrics", [])?;
276 let _ = conn.execute("DELETE FROM control_plane_audit", []);
277 if include_config {
278 conn.execute("DELETE FROM orchestrator_config_versions", [])?;
279 } else if include_history {
280 conn.execute(
281 "DELETE FROM orchestrator_config_versions WHERE version < (SELECT COALESCE(MAX(version), 0) FROM orchestrator_config_versions)",
282 [],
283 )?;
284 }
285 Ok(())
286}
287
288pub fn insert_control_plane_audit(db_path: &Path, record: &ControlPlaneAuditRecord) -> Result<()> {
290 let conn = open_conn(db_path)?;
291 conn.execute(
292 "INSERT INTO control_plane_audit (
293 created_at, transport, remote_addr, rpc, subject_id, authn_result,
294 authz_result, role, reason, tls_fingerprint, rejection_stage,
295 traffic_class, limit_scope, decision, reason_code
296 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
297 params![
298 crate::config_load::now_ts(),
299 record.transport,
300 record.remote_addr,
301 record.rpc,
302 record.subject_id,
303 record.authn_result,
304 record.authz_result,
305 record.role,
306 record.reason,
307 record.tls_fingerprint,
308 record.rejection_stage,
309 record.traffic_class,
310 record.limit_scope,
311 record.decision,
312 record.reason_code,
313 ],
314 )?;
315 Ok(())
316}
317
318pub fn insert_task_execution_metric(db_path: &Path, metric: &TaskExecutionMetric) -> Result<()> {
320 let conn = open_conn(db_path)?;
321 conn.execute(
322 "INSERT INTO task_execution_metrics (task_id, status, current_cycle, unresolved_items, total_items, failed_items, command_runs, created_at)
323 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
324 params![
325 metric.task_id,
326 metric.status,
327 metric.current_cycle as i64,
328 metric.unresolved_items,
329 metric.total_items,
330 metric.failed_items,
331 metric.command_runs,
332 metric.created_at
333 ],
334 )?;
335 Ok(())
336}
337
338pub fn reset_project_data(
340 state: &crate::state::InnerState,
341 project_id: &str,
342) -> Result<ProjectResetStats> {
343 let conn = open_conn(&state.db_path)?;
344 let tx = conn.unchecked_transaction()?;
345
346 let tasks: i64 = tx.query_row(
347 "SELECT COUNT(*) FROM tasks WHERE project_id = ?1",
348 params![project_id],
349 |row| row.get(0),
350 )?;
351 let task_items: i64 = tx.query_row(
352 "SELECT COUNT(*) FROM task_items WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
353 params![project_id],
354 |row| row.get(0),
355 )?;
356 let command_runs: i64 = tx.query_row(
357 "SELECT COUNT(*) FROM command_runs WHERE task_item_id IN (
358 SELECT ti.id FROM task_items ti
359 JOIN tasks t ON t.id = ti.task_id
360 WHERE t.project_id = ?1
361 )",
362 params![project_id],
363 |row| row.get(0),
364 )?;
365 let events: i64 = tx.query_row(
366 "SELECT COUNT(*) FROM events WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
367 params![project_id],
368 |row| row.get(0),
369 )?;
370
371 tx.execute(
372 "DELETE FROM task_graph_snapshots WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
373 params![project_id],
374 )?;
375 tx.execute(
376 "DELETE FROM task_graph_runs WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
377 params![project_id],
378 )?;
379 tx.execute(
380 "DELETE FROM command_runs WHERE task_item_id IN (
381 SELECT ti.id FROM task_items ti
382 JOIN tasks t ON t.id = ti.task_id
383 WHERE t.project_id = ?1
384 )",
385 params![project_id],
386 )?;
387 tx.execute(
388 "DELETE FROM events WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
389 params![project_id],
390 )?;
391 tx.execute(
392 "DELETE FROM task_items WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
393 params![project_id],
394 )?;
395 tx.execute(
396 "DELETE FROM task_execution_metrics WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
397 params![project_id],
398 )?;
399 tx.execute(
400 "DELETE FROM tasks WHERE project_id = ?1",
401 params![project_id],
402 )?;
403
404 tx.commit()?;
405
406 Ok(ProjectResetStats {
407 tasks: tasks.max(0) as u64,
408 task_items: task_items.max(0) as u64,
409 command_runs: command_runs.max(0) as u64,
410 events: events.max(0) as u64,
411 tickets_cleaned: 0,
412 })
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use crate::dto::CreateTaskPayload;
419 use crate::task_ops::create_task_impl;
420 use crate::test_utils::TestState;
421
422 fn tmp_db_path() -> (std::path::PathBuf, std::path::PathBuf) {
423 let dir = std::env::temp_dir().join(format!("db-test-{}", uuid::Uuid::new_v4()));
424 std::fs::create_dir_all(&dir).expect("create tmp dir");
425 let db_path = dir.join("test.db");
426 (dir, db_path)
427 }
428
429 #[test]
432 fn open_conn_creates_connection() {
433 let (_dir, db_path) = tmp_db_path();
434 init_schema(&db_path).expect("init_schema");
435
436 let conn = open_conn(&db_path).expect("open_conn");
437 let fk: i64 = conn
439 .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
440 .expect("pragma");
441 assert_eq!(fk, 1);
442 }
443
444 #[test]
447 fn init_schema_creates_tables() {
448 let (_dir, db_path) = tmp_db_path();
449 init_schema(&db_path).expect("init_schema");
450
451 let conn = open_conn(&db_path).expect("open_conn");
452 let tables: Vec<String> = {
453 let mut stmt = conn
454 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
455 .expect("prepare");
456 stmt.query_map([], |row| row.get(0))
457 .expect("query")
458 .collect::<std::result::Result<Vec<_>, _>>()
459 .expect("collect")
460 };
461
462 assert!(tables.contains(&"tasks".to_string()));
463 assert!(tables.contains(&"task_items".to_string()));
464 assert!(tables.contains(&"command_runs".to_string()));
465 assert!(tables.contains(&"events".to_string()));
466 assert!(tables.contains(&"agent_sessions".to_string()));
467 assert!(tables.contains(&"session_attachments".to_string()));
468 }
469
470 #[test]
471 fn init_schema_is_idempotent() {
472 let (_dir, db_path) = tmp_db_path();
473 init_schema(&db_path).expect("first init");
474 init_schema(&db_path).expect("second init should succeed");
475 }
476
477 #[test]
480 fn count_non_terminal_tasks_by_workspace_returns_zero_initially() {
481 let (_dir, db_path) = tmp_db_path();
482 init_schema(&db_path).expect("init_schema");
483
484 let conn = open_conn(&db_path).expect("open_conn");
485 let count =
486 count_non_terminal_tasks_by_workspace(&conn, "default", "nonexistent").expect("count");
487 assert_eq!(count, 0);
488 }
489
490 #[test]
491 fn count_non_terminal_tasks_by_workspace_counts_correctly() {
492 let mut fixture = TestState::new();
493 let state = fixture.build();
494
495 let qa_file = state
496 .data_dir
497 .join("workspace/default/docs/qa/count_ws_test.md");
498 std::fs::write(&qa_file, "# count ws test\n").expect("seed qa file");
499
500 create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
501 create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
502
503 let conn = open_conn(&state.db_path).expect("open sqlite");
504 let count = count_non_terminal_tasks_by_workspace(
505 &conn,
506 crate::config::DEFAULT_PROJECT_ID,
507 "default",
508 )
509 .expect("count");
510 assert_eq!(count, 2);
511 }
512
513 #[test]
514 fn count_non_terminal_tasks_by_workflow_returns_zero_initially() {
515 let (_dir, db_path) = tmp_db_path();
516 init_schema(&db_path).expect("init_schema");
517
518 let conn = open_conn(&db_path).expect("open_conn");
519 let count =
520 count_non_terminal_tasks_by_workflow(&conn, "default", "nonexistent").expect("count");
521 assert_eq!(count, 0);
522 }
523
524 #[test]
525 fn count_non_terminal_tasks_by_workflow_counts_correctly() {
526 let mut fixture = TestState::new();
527 let state = fixture.build();
528
529 let qa_file = state
530 .data_dir
531 .join("workspace/default/docs/qa/count_wf_test.md");
532 std::fs::write(&qa_file, "# count wf test\n").expect("seed qa file");
533
534 create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
535
536 let conn = open_conn(&state.db_path).expect("open sqlite");
537 let count =
538 count_non_terminal_tasks_by_workflow(&conn, crate::config::DEFAULT_PROJECT_ID, "basic")
539 .expect("count");
540 assert_eq!(count, 1);
541 }
542
543 #[test]
546 fn insert_task_execution_metric_stores_row() {
547 let (_dir, db_path) = tmp_db_path();
548 init_schema(&db_path).expect("init_schema");
549
550 let metric = TaskExecutionMetric {
551 task_id: "task-123".to_string(),
552 status: "running".to_string(),
553 current_cycle: 2,
554 unresolved_items: 3,
555 total_items: 10,
556 failed_items: 1,
557 command_runs: 5,
558 created_at: "2026-01-01T00:00:00Z".to_string(),
559 };
560 insert_task_execution_metric(&db_path, &metric).expect("insert metric");
561
562 let conn = open_conn(&db_path).expect("open sqlite");
563 let (tid, status, cycle, unresolved, total, failed, runs): (
564 String, String, i64, i64, i64, i64, i64,
565 ) = conn
566 .query_row(
567 "SELECT task_id, status, current_cycle, unresolved_items, total_items, failed_items, command_runs FROM task_execution_metrics WHERE task_id = ?1",
568 params!["task-123"],
569 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?, row.get(6)?)),
570 )
571 .expect("query metric");
572
573 assert_eq!(tid, "task-123");
574 assert_eq!(status, "running");
575 assert_eq!(cycle, 2);
576 assert_eq!(unresolved, 3);
577 assert_eq!(total, 10);
578 assert_eq!(failed, 1);
579 assert_eq!(runs, 5);
580 }
581
582 #[test]
585 fn reset_db_clears_data() {
586 let mut fixture = TestState::new();
587 let state = fixture.build();
588
589 let qa_file = state
590 .data_dir
591 .join("workspace/default/docs/qa/reset_test.md");
592 std::fs::write(&qa_file, "# reset test\n").expect("seed qa file");
593
594 create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
595
596 let conn = open_conn(&state.db_path).expect("open sqlite");
598 let before: i64 = conn
599 .query_row("SELECT COUNT(*) FROM tasks", [], |row| row.get(0))
600 .expect("count before");
601 assert!(before > 0);
602 drop(conn);
603
604 reset_db(&state, false, false).expect("reset_db");
605
606 let conn = open_conn(&state.db_path).expect("open sqlite");
607 let after: i64 = conn
608 .query_row("SELECT COUNT(*) FROM tasks", [], |row| row.get(0))
609 .expect("count after");
610 assert_eq!(after, 0);
611 }
612
613 #[test]
614 fn reset_db_with_config_clears_config() {
615 let mut fixture = TestState::new();
616 let state = fixture.build();
617
618 let conn = open_conn(&state.db_path).expect("open sqlite");
620 let versions_before: i64 = conn
621 .query_row(
622 "SELECT COUNT(*) FROM orchestrator_config_versions",
623 [],
624 |row| row.get(0),
625 )
626 .expect("count config versions before");
627 assert!(versions_before > 0);
628 drop(conn);
629
630 reset_db(&state, false, true).expect("reset_db with config");
631
632 let conn = open_conn(&state.db_path).expect("open sqlite");
633 let versions_after: i64 = conn
634 .query_row(
635 "SELECT COUNT(*) FROM orchestrator_config_versions",
636 [],
637 |row| row.get(0),
638 )
639 .expect("count config versions after");
640 assert_eq!(versions_after, 0);
641 }
642
643 #[test]
644 fn reset_db_blocked_when_running_task_exists() {
645 let mut fixture = TestState::new();
646 let state = fixture.build();
647
648 let qa_file = state
649 .data_dir
650 .join("workspace/default/docs/qa/guard_test.md");
651 std::fs::write(&qa_file, "# guard test\n").expect("seed qa file");
652
653 let task = create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
654
655 let conn = open_conn(&state.db_path).expect("open sqlite");
657 conn.execute(
658 "UPDATE tasks SET status = 'running' WHERE id = ?1",
659 params![task.id],
660 )
661 .expect("set task running");
662 drop(conn);
663
664 let result = reset_db(&state, false, false);
665 assert!(result.is_err());
666 let err = result.expect_err("should be blocked").to_string();
667 assert!(
668 err.contains("db reset blocked"),
669 "unexpected error: {}",
670 err
671 );
672 }
673
674 #[test]
675 fn reset_db_blocked_when_restart_pending_task_exists() {
676 let mut fixture = TestState::new();
677 let state = fixture.build();
678
679 let qa_file = state
680 .data_dir
681 .join("workspace/default/docs/qa/restart_guard.md");
682 std::fs::write(&qa_file, "# restart guard\n").expect("seed qa file");
683
684 let task = create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
685
686 let conn = open_conn(&state.db_path).expect("open sqlite");
687 conn.execute(
688 "UPDATE tasks SET status = 'restart_pending' WHERE id = ?1",
689 params![task.id],
690 )
691 .expect("set task restart_pending");
692 drop(conn);
693
694 let result = reset_db(&state, false, false);
695 assert!(result.is_err());
696 assert!(
697 result
698 .expect_err("should be blocked")
699 .to_string()
700 .contains("db reset blocked")
701 );
702 }
703
704 #[test]
707 fn reset_project_data_returns_zero_stats_for_unknown_project() {
708 let mut fixture = TestState::new();
709 let state = fixture.build();
710
711 let stats = reset_project_data(&state, "nonexistent-project").expect("reset project data");
712 assert_eq!(
713 stats,
714 ProjectResetStats {
715 tasks: 0,
716 task_items: 0,
717 command_runs: 0,
718 events: 0,
719 tickets_cleaned: 0,
720 }
721 );
722 }
723
724 #[test]
727 fn project_reset_stats_debug_and_eq() {
728 let a = ProjectResetStats {
729 tasks: 1,
730 task_items: 2,
731 command_runs: 3,
732 events: 4,
733 tickets_cleaned: 0,
734 };
735 let b = a;
736 assert_eq!(a, b);
737 let _debug = format!("{:?}", a);
739 }
740
741 #[test]
744 fn list_non_terminal_tasks_by_workspace_empty() {
745 let (_dir, db_path) = tmp_db_path();
746 init_schema(&db_path).expect("init_schema");
747
748 let conn = open_conn(&db_path).expect("open_conn");
749 let tasks =
750 list_non_terminal_tasks_by_workspace(&conn, "default", "ws1", 10).expect("list empty");
751 assert!(tasks.is_empty());
752 }
753
754 #[test]
755 fn list_non_terminal_tasks_by_workspace_returns_matching() {
756 let mut fixture = TestState::new();
757 let state = fixture.build();
758
759 let qa_file = state
760 .data_dir
761 .join("workspace/default/docs/qa/list_ws_test.md");
762 std::fs::write(&qa_file, "# list ws test\n").expect("seed qa file");
763
764 create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
765 create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
766
767 let conn = open_conn(&state.db_path).expect("open sqlite");
768 let tasks = list_non_terminal_tasks_by_workspace(
769 &conn,
770 crate::config::DEFAULT_PROJECT_ID,
771 "default",
772 10,
773 )
774 .expect("list");
775 assert_eq!(tasks.len(), 2);
776 assert_eq!(tasks[0].status, "created");
777 assert_eq!(tasks[1].status, "created");
778 }
779
780 #[test]
781 fn list_non_terminal_tasks_by_workspace_respects_limit() {
782 let mut fixture = TestState::new();
783 let state = fixture.build();
784
785 let qa_file = state
786 .data_dir
787 .join("workspace/default/docs/qa/limit_ws_test.md");
788 std::fs::write(&qa_file, "# limit ws test\n").expect("seed qa file");
789
790 create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
791 create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
792 create_task_impl(&state, CreateTaskPayload::default()).expect("task 3");
793
794 let conn = open_conn(&state.db_path).expect("open sqlite");
795 let tasks = list_non_terminal_tasks_by_workspace(
796 &conn,
797 crate::config::DEFAULT_PROJECT_ID,
798 "default",
799 2,
800 )
801 .expect("list limited");
802 assert_eq!(tasks.len(), 2);
803 }
804
805 #[test]
806 fn list_non_terminal_tasks_by_workspace_excludes_terminal() {
807 let mut fixture = TestState::new();
808 let state = fixture.build();
809
810 let qa_file = state
811 .data_dir
812 .join("workspace/default/docs/qa/terminal_ws_test.md");
813 std::fs::write(&qa_file, "# terminal ws test\n").expect("seed qa file");
814
815 let task = create_task_impl(&state, CreateTaskPayload::default()).expect("task");
816
817 let conn = open_conn(&state.db_path).expect("open sqlite");
819 conn.execute(
820 "UPDATE tasks SET status = 'completed' WHERE id = ?1",
821 params![task.id],
822 )
823 .expect("set task completed");
824
825 let tasks = list_non_terminal_tasks_by_workspace(
826 &conn,
827 crate::config::DEFAULT_PROJECT_ID,
828 "default",
829 10,
830 )
831 .expect("list");
832 assert!(tasks.is_empty());
833 }
834
835 #[test]
838 fn list_non_terminal_tasks_by_workflow_empty() {
839 let (_dir, db_path) = tmp_db_path();
840 init_schema(&db_path).expect("init_schema");
841
842 let conn = open_conn(&db_path).expect("open_conn");
843 let tasks =
844 list_non_terminal_tasks_by_workflow(&conn, "default", "wf1", 10).expect("list empty");
845 assert!(tasks.is_empty());
846 }
847
848 #[test]
849 fn list_non_terminal_tasks_by_workflow_returns_matching() {
850 let mut fixture = TestState::new();
851 let state = fixture.build();
852
853 let qa_file = state
854 .data_dir
855 .join("workspace/default/docs/qa/list_wf_test.md");
856 std::fs::write(&qa_file, "# list wf test\n").expect("seed qa file");
857
858 create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
859
860 let conn = open_conn(&state.db_path).expect("open sqlite");
861 let tasks = list_non_terminal_tasks_by_workflow(
862 &conn,
863 crate::config::DEFAULT_PROJECT_ID,
864 "basic",
865 10,
866 )
867 .expect("list");
868 assert_eq!(tasks.len(), 1);
869 assert_eq!(tasks[0].status, "created");
870 }
871
872 #[test]
873 fn list_non_terminal_tasks_by_workflow_respects_limit() {
874 let mut fixture = TestState::new();
875 let state = fixture.build();
876
877 let qa_file = state
878 .data_dir
879 .join("workspace/default/docs/qa/limit_wf_test.md");
880 std::fs::write(&qa_file, "# limit wf test\n").expect("seed qa file");
881
882 create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
883 create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
884 create_task_impl(&state, CreateTaskPayload::default()).expect("task 3");
885
886 let conn = open_conn(&state.db_path).expect("open sqlite");
887 let tasks = list_non_terminal_tasks_by_workflow(
888 &conn,
889 crate::config::DEFAULT_PROJECT_ID,
890 "basic",
891 1,
892 )
893 .expect("list limited");
894 assert_eq!(tasks.len(), 1);
895 }
896
897 #[test]
900 fn insert_control_plane_audit_stores_row() {
901 let (_dir, db_path) = tmp_db_path();
902 init_schema(&db_path).expect("init_schema");
903
904 let record = ControlPlaneAuditRecord {
905 transport: "grpc".to_string(),
906 remote_addr: Some("127.0.0.1:5000".to_string()),
907 rpc: "CreateTask".to_string(),
908 subject_id: Some("user-1".to_string()),
909 authn_result: "ok".to_string(),
910 authz_result: "allowed".to_string(),
911 role: Some("admin".to_string()),
912 reason: Some("normal access".to_string()),
913 tls_fingerprint: None,
914 rejection_stage: None,
915 traffic_class: None,
916 limit_scope: None,
917 decision: None,
918 reason_code: None,
919 };
920 insert_control_plane_audit(&db_path, &record).expect("insert audit");
921
922 let conn = open_conn(&db_path).expect("open sqlite");
923 let (transport, rpc, authn, authz): (String, String, String, String) = conn
924 .query_row(
925 "SELECT transport, rpc, authn_result, authz_result FROM control_plane_audit LIMIT 1",
926 [],
927 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
928 )
929 .expect("query audit");
930 assert_eq!(transport, "grpc");
931 assert_eq!(rpc, "CreateTask");
932 assert_eq!(authn, "ok");
933 assert_eq!(authz, "allowed");
934 }
935
936 #[test]
937 fn insert_control_plane_audit_with_none_fields() {
938 let (_dir, db_path) = tmp_db_path();
939 init_schema(&db_path).expect("init_schema");
940
941 let record = ControlPlaneAuditRecord {
942 transport: "uds".to_string(),
943 remote_addr: None,
944 rpc: "ListTasks".to_string(),
945 subject_id: None,
946 authn_result: "skipped".to_string(),
947 authz_result: "skipped".to_string(),
948 role: None,
949 reason: None,
950 tls_fingerprint: None,
951 rejection_stage: None,
952 traffic_class: None,
953 limit_scope: None,
954 decision: None,
955 reason_code: None,
956 };
957 insert_control_plane_audit(&db_path, &record).expect("insert audit with nones");
958
959 let conn = open_conn(&db_path).expect("open sqlite");
960 let count: i64 = conn
961 .query_row("SELECT COUNT(*) FROM control_plane_audit", [], |row| {
962 row.get(0)
963 })
964 .expect("count audit");
965 assert_eq!(count, 1);
966 }
967
968 #[test]
971 fn reset_db_with_history_keeps_latest_config_version() {
972 let mut fixture = TestState::new();
973 let state = fixture.build();
974
975 let conn = open_conn(&state.db_path).expect("open sqlite");
977 let versions_before: i64 = conn
978 .query_row(
979 "SELECT COUNT(*) FROM orchestrator_config_versions",
980 [],
981 |row| row.get(0),
982 )
983 .expect("count config versions before");
984 assert!(versions_before > 0);
985 drop(conn);
986
987 reset_db(&state, true, false).expect("reset_db with history");
990
991 let conn = open_conn(&state.db_path).expect("open sqlite");
992 let versions_after: i64 = conn
993 .query_row(
994 "SELECT COUNT(*) FROM orchestrator_config_versions",
995 [],
996 |row| row.get(0),
997 )
998 .expect("count config versions after");
999 assert!(versions_after <= 1, "Expected <= 1, got {}", versions_after);
1001 let tasks: i64 = conn
1003 .query_row("SELECT COUNT(*) FROM tasks", [], |row| row.get(0))
1004 .expect("count tasks");
1005 assert_eq!(tasks, 0);
1006 }
1007
1008 #[test]
1011 fn reset_project_data_clears_project_data_and_returns_stats() {
1012 let mut fixture = TestState::new();
1013 let state = fixture.build();
1014
1015 let qa_file = state
1016 .data_dir
1017 .join("workspace/default/docs/qa/proj_reset_test.md");
1018 std::fs::write(&qa_file, "# proj reset test\n").expect("seed qa file");
1019
1020 let task = create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
1021
1022 let conn = open_conn(&state.db_path).expect("open sqlite");
1024 let task_count: i64 = conn
1025 .query_row(
1026 "SELECT COUNT(*) FROM tasks WHERE project_id = ?1",
1027 params![crate::config::DEFAULT_PROJECT_ID],
1028 |row| row.get(0),
1029 )
1030 .expect("count tasks");
1031 assert!(task_count > 0);
1032
1033 conn.execute(
1035 "INSERT INTO events (task_id, event_type, payload_json, created_at) VALUES (?1, ?2, ?3, ?4)",
1036 params![task.id, "test", "{}", crate::config_load::now_ts()],
1037 )
1038 .expect("insert event");
1039 drop(conn);
1040
1041 let stats =
1042 reset_project_data(&state, crate::config::DEFAULT_PROJECT_ID).expect("reset project");
1043 assert!(stats.tasks > 0, "expected tasks > 0, got {}", stats.tasks);
1044 assert_eq!(stats.tickets_cleaned, 0); let conn = open_conn(&state.db_path).expect("open sqlite after reset");
1048 let task_count_after: i64 = conn
1049 .query_row(
1050 "SELECT COUNT(*) FROM tasks WHERE project_id = ?1",
1051 params![crate::config::DEFAULT_PROJECT_ID],
1052 |row| row.get(0),
1053 )
1054 .expect("count tasks after");
1055 assert_eq!(task_count_after, 0);
1056
1057 let event_count_after: i64 = conn
1058 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
1059 .expect("count events after");
1060 assert_eq!(event_count_after, 0);
1061 }
1062
1063 #[test]
1066 fn count_non_terminal_tasks_by_workspace_excludes_completed() {
1067 let mut fixture = TestState::new();
1068 let state = fixture.build();
1069
1070 let qa_file = state
1071 .data_dir
1072 .join("workspace/default/docs/qa/terminal_count_test.md");
1073 std::fs::write(&qa_file, "# terminal count test\n").expect("seed qa file");
1074
1075 let task = create_task_impl(&state, CreateTaskPayload::default()).expect("task");
1076
1077 let conn = open_conn(&state.db_path).expect("open sqlite");
1078 conn.execute(
1079 "UPDATE tasks SET status = 'completed' WHERE id = ?1",
1080 params![task.id],
1081 )
1082 .expect("set completed");
1083
1084 let count = count_non_terminal_tasks_by_workspace(
1085 &conn,
1086 crate::config::DEFAULT_PROJECT_ID,
1087 "default",
1088 )
1089 .expect("count");
1090 assert_eq!(count, 0);
1091 }
1092}