Skip to main content

agent_orchestrator/
db.rs

1use anyhow::Result;
2use rusqlite::{Connection, params};
3use std::path::Path;
4
5pub use crate::persistence::sqlite::SQLITE_BUSY_TIMEOUT_MS;
6
7/// Counts returned after deleting all persisted state for one project.
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub struct ProjectResetStats {
10    /// Number of task rows removed.
11    pub tasks: u64,
12    /// Number of task-item rows removed.
13    pub task_items: u64,
14    /// Number of command-run rows removed.
15    pub command_runs: u64,
16    /// Number of event rows removed.
17    pub events: u64,
18    /// Number of ticket files removed from disk.
19    pub tickets_cleaned: u64,
20}
21
22/// Minimal reference to a non-terminal task.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct TaskReference {
25    /// Stable task identifier.
26    pub task_id: String,
27    /// Current task status label.
28    pub status: String,
29}
30
31/// Execution metrics materialized into the persistence layer.
32#[derive(Debug, Clone)]
33pub struct TaskExecutionMetric {
34    /// Stable task identifier.
35    pub task_id: String,
36    /// Task status associated with the metric sample.
37    pub status: String,
38    /// Workflow cycle active when the sample was recorded.
39    pub current_cycle: u32,
40    /// Number of unresolved task items at sample time.
41    pub unresolved_items: i64,
42    /// Total number of task items known for the task.
43    pub total_items: i64,
44    /// Number of failed task items at sample time.
45    pub failed_items: i64,
46    /// Number of command runs recorded so far.
47    pub command_runs: i64,
48    /// RFC 3339 timestamp when the metric was captured.
49    pub created_at: String,
50}
51
52/// Audit payload written for one control-plane authorization decision.
53#[derive(Debug, Clone)]
54pub struct ControlPlaneAuditRecord {
55    /// Transport used by the incoming RPC, such as `tcp`.
56    pub transport: String,
57    /// Remote peer address when known.
58    pub remote_addr: Option<String>,
59    /// Fully qualified RPC name.
60    pub rpc: String,
61    /// Authenticated subject identifier when available.
62    pub subject_id: Option<String>,
63    /// Authentication outcome label.
64    pub authn_result: String,
65    /// Authorization outcome label.
66    pub authz_result: String,
67    /// Effective role assigned to the subject.
68    pub role: Option<String>,
69    /// Human-readable reason for denial or fallback behavior.
70    pub reason: Option<String>,
71    /// SHA-256 fingerprint of the presented client certificate.
72    pub tls_fingerprint: Option<String>,
73    /// Pipeline stage that rejected the request.
74    pub rejection_stage: Option<String>,
75    /// Traffic bucket selected for protection enforcement.
76    pub traffic_class: Option<String>,
77    /// Whether subject-scoped or global limits produced the decision.
78    pub limit_scope: Option<String>,
79    /// Final decision label written by the limiter.
80    pub decision: Option<String>,
81    /// Stable machine-readable reason code.
82    pub reason_code: Option<String>,
83}
84
85/// Audit payload for plugin-related authorization and execution decisions.
86#[derive(Debug, Clone)]
87pub struct PluginAuditRecord {
88    /// Action: `crd_apply`, `plugin_execute`, or `hook_execute`.
89    pub action: String,
90    /// CRD kind that owns the plugin.
91    pub crd_kind: String,
92    /// Plugin name (None for hooks).
93    pub plugin_name: Option<String>,
94    /// Plugin type: `interceptor`, `transformer`, `cron`, or `hook`.
95    pub plugin_type: Option<String>,
96    /// Full command string.
97    pub command: String,
98    /// Caller identity (TLS subject_id or `uds:<pid>`).
99    pub applied_by: Option<String>,
100    /// Transport: `tcp` or `uds`.
101    pub transport: Option<String>,
102    /// Peer process ID (UDS only).
103    pub peer_pid: Option<i32>,
104    /// Verdict: `allowed`, `denied`, or `audit_warning`.
105    pub result: String,
106    /// Active policy mode: `deny`, `allowlist`, or `audit`.
107    pub policy_mode: Option<String>,
108}
109
110/// Inserts one plugin-audit record into persistence.
111pub fn insert_plugin_audit(db_path: &Path, record: &PluginAuditRecord) -> Result<()> {
112    let conn = open_conn(db_path)?;
113    conn.execute(
114        "INSERT INTO plugin_audit (
115            created_at, action, crd_kind, plugin_name, plugin_type,
116            command, applied_by, transport, peer_pid, result, policy_mode
117         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
118        params![
119            crate::config_load::now_ts(),
120            record.action,
121            record.crd_kind,
122            record.plugin_name,
123            record.plugin_type,
124            record.command,
125            record.applied_by,
126            record.transport,
127            record.peer_pid,
128            record.result,
129            record.policy_mode,
130        ],
131    )?;
132    Ok(())
133}
134
135/// Opens a SQLite connection using the orchestrator persistence defaults.
136pub fn open_conn(db_path: &Path) -> Result<Connection> {
137    crate::persistence::sqlite::open_conn(db_path)
138}
139
140/// Applies the standard busy timeout and pragma configuration to a connection.
141pub fn configure_conn(conn: &Connection) -> Result<()> {
142    crate::persistence::sqlite::configure_conn(conn)
143}
144
145/// Ensures the persistence schema exists and is migrated to the current version.
146pub fn init_schema(db_path: &Path) -> Result<()> {
147    crate::persistence::schema::PersistenceBootstrap::ensure_current(db_path)?;
148    Ok(())
149}
150
151/// Counts running or pending tasks for one project workspace pair.
152pub fn count_non_terminal_tasks_by_workspace(
153    conn: &Connection,
154    project_id: &str,
155    workspace_id: &str,
156) -> Result<i64> {
157    let count: i64 = conn.query_row(
158        "SELECT COUNT(*) FROM tasks
159         WHERE project_id = ?1
160           AND workspace_id = ?2
161           AND status IN ('created', 'pending', 'running', 'restart_pending')",
162        params![project_id, workspace_id],
163        |row| row.get(0),
164    )?;
165    Ok(count)
166}
167
168/// Counts running or pending tasks for one project workflow pair.
169pub fn count_non_terminal_tasks_by_workflow(
170    conn: &Connection,
171    project_id: &str,
172    workflow_id: &str,
173) -> Result<i64> {
174    let count: i64 = conn.query_row(
175        "SELECT COUNT(*) FROM tasks
176         WHERE project_id = ?1
177           AND workflow_id = ?2
178           AND status IN ('created', 'pending', 'running', 'restart_pending')",
179        params![project_id, workflow_id],
180        |row| row.get(0),
181    )?;
182    Ok(count)
183}
184
185/// Lists the oldest non-terminal tasks for one project workspace pair.
186pub fn list_non_terminal_tasks_by_workspace(
187    conn: &Connection,
188    project_id: &str,
189    workspace_id: &str,
190    limit: usize,
191) -> Result<Vec<TaskReference>> {
192    let mut stmt = conn.prepare(
193        "SELECT id, status FROM tasks
194         WHERE project_id = ?1
195           AND workspace_id = ?2
196           AND status IN ('created', 'pending', 'running', 'restart_pending')
197         ORDER BY created_at ASC
198         LIMIT ?3",
199    )?;
200    let rows = stmt.query_map(params![project_id, workspace_id, limit], |row| {
201        Ok(TaskReference {
202            task_id: row.get(0)?,
203            status: row.get(1)?,
204        })
205    })?;
206    let mut tasks = Vec::new();
207    for row in rows {
208        tasks.push(row?);
209    }
210    Ok(tasks)
211}
212
213/// Lists the oldest non-terminal tasks for one project workflow pair.
214pub fn list_non_terminal_tasks_by_workflow(
215    conn: &Connection,
216    project_id: &str,
217    workflow_id: &str,
218    limit: usize,
219) -> Result<Vec<TaskReference>> {
220    let mut stmt = conn.prepare(
221        "SELECT id, status FROM tasks
222         WHERE project_id = ?1
223           AND workflow_id = ?2
224           AND status IN ('created', 'pending', 'running', 'restart_pending')
225         ORDER BY created_at ASC
226         LIMIT ?3",
227    )?;
228    let rows = stmt.query_map(params![project_id, workflow_id, limit], |row| {
229        Ok(TaskReference {
230            task_id: row.get(0)?,
231            status: row.get(1)?,
232        })
233    })?;
234    let mut tasks = Vec::new();
235    for row in rows {
236        tasks.push(row?);
237    }
238    Ok(tasks)
239}
240
241/// Resets persisted runtime data using the active daemon state.
242pub fn reset_db(
243    state: &crate::state::InnerState,
244    include_history: bool,
245    include_config: bool,
246) -> Result<()> {
247    reset_db_by_path(&state.db_path, include_history, include_config)
248}
249
250/// Deletes persisted runtime data from a database path after guarding against active tasks.
251pub fn reset_db_by_path(db_path: &Path, include_history: bool, include_config: bool) -> Result<()> {
252    let conn = open_conn(db_path)?;
253
254    let active_count: i64 = conn
255        .query_row(
256            "SELECT COUNT(*) FROM tasks WHERE status IN ('running', 'restart_pending')",
257            [],
258            |row| row.get(0),
259        )
260        .unwrap_or(0);
261    if active_count > 0 {
262        anyhow::bail!(
263            "db reset blocked: {} active task(s) with status running/restart_pending. \
264             Use `project reset <project> --force` for project-scoped cleanup instead.",
265            active_count
266        );
267    }
268
269    conn.execute("DELETE FROM events", [])?;
270    let _ = conn.execute("DELETE FROM task_graph_snapshots", []);
271    let _ = conn.execute("DELETE FROM task_graph_runs", []);
272    conn.execute("DELETE FROM command_runs", [])?;
273    conn.execute("DELETE FROM task_items", [])?;
274    conn.execute("DELETE FROM tasks", [])?;
275    conn.execute("DELETE FROM task_execution_metrics", [])?;
276    let _ = conn.execute("DELETE FROM control_plane_audit", []);
277    if include_config {
278        conn.execute("DELETE FROM orchestrator_config_versions", [])?;
279    } else if include_history {
280        conn.execute(
281            "DELETE FROM orchestrator_config_versions WHERE version < (SELECT COALESCE(MAX(version), 0) FROM orchestrator_config_versions)",
282            [],
283        )?;
284    }
285    Ok(())
286}
287
288/// Inserts one control-plane audit record into persistence.
289pub fn insert_control_plane_audit(db_path: &Path, record: &ControlPlaneAuditRecord) -> Result<()> {
290    let conn = open_conn(db_path)?;
291    conn.execute(
292        "INSERT INTO control_plane_audit (
293            created_at, transport, remote_addr, rpc, subject_id, authn_result,
294            authz_result, role, reason, tls_fingerprint, rejection_stage,
295            traffic_class, limit_scope, decision, reason_code
296         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
297        params![
298            crate::config_load::now_ts(),
299            record.transport,
300            record.remote_addr,
301            record.rpc,
302            record.subject_id,
303            record.authn_result,
304            record.authz_result,
305            record.role,
306            record.reason,
307            record.tls_fingerprint,
308            record.rejection_stage,
309            record.traffic_class,
310            record.limit_scope,
311            record.decision,
312            record.reason_code,
313        ],
314    )?;
315    Ok(())
316}
317
318/// Inserts one task execution metric sample into persistence.
319pub fn insert_task_execution_metric(db_path: &Path, metric: &TaskExecutionMetric) -> Result<()> {
320    let conn = open_conn(db_path)?;
321    conn.execute(
322        "INSERT INTO task_execution_metrics (task_id, status, current_cycle, unresolved_items, total_items, failed_items, command_runs, created_at)
323         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
324        params![
325            metric.task_id,
326            metric.status,
327            metric.current_cycle as i64,
328            metric.unresolved_items,
329            metric.total_items,
330            metric.failed_items,
331            metric.command_runs,
332            metric.created_at
333        ],
334    )?;
335    Ok(())
336}
337
338/// Deletes all persisted records and ticket files associated with one project.
339pub fn reset_project_data(
340    state: &crate::state::InnerState,
341    project_id: &str,
342) -> Result<ProjectResetStats> {
343    let conn = open_conn(&state.db_path)?;
344    let tx = conn.unchecked_transaction()?;
345
346    let tasks: i64 = tx.query_row(
347        "SELECT COUNT(*) FROM tasks WHERE project_id = ?1",
348        params![project_id],
349        |row| row.get(0),
350    )?;
351    let task_items: i64 = tx.query_row(
352        "SELECT COUNT(*) FROM task_items WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
353        params![project_id],
354        |row| row.get(0),
355    )?;
356    let command_runs: i64 = tx.query_row(
357        "SELECT COUNT(*) FROM command_runs WHERE task_item_id IN (
358            SELECT ti.id FROM task_items ti
359            JOIN tasks t ON t.id = ti.task_id
360            WHERE t.project_id = ?1
361        )",
362        params![project_id],
363        |row| row.get(0),
364    )?;
365    let events: i64 = tx.query_row(
366        "SELECT COUNT(*) FROM events WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
367        params![project_id],
368        |row| row.get(0),
369    )?;
370
371    tx.execute(
372        "DELETE FROM task_graph_snapshots WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
373        params![project_id],
374    )?;
375    tx.execute(
376        "DELETE FROM task_graph_runs WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
377        params![project_id],
378    )?;
379    tx.execute(
380        "DELETE FROM command_runs WHERE task_item_id IN (
381            SELECT ti.id FROM task_items ti
382            JOIN tasks t ON t.id = ti.task_id
383            WHERE t.project_id = ?1
384        )",
385        params![project_id],
386    )?;
387    tx.execute(
388        "DELETE FROM events WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
389        params![project_id],
390    )?;
391    tx.execute(
392        "DELETE FROM task_items WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
393        params![project_id],
394    )?;
395    tx.execute(
396        "DELETE FROM task_execution_metrics WHERE task_id IN (SELECT id FROM tasks WHERE project_id = ?1)",
397        params![project_id],
398    )?;
399    tx.execute(
400        "DELETE FROM tasks WHERE project_id = ?1",
401        params![project_id],
402    )?;
403
404    tx.commit()?;
405
406    Ok(ProjectResetStats {
407        tasks: tasks.max(0) as u64,
408        task_items: task_items.max(0) as u64,
409        command_runs: command_runs.max(0) as u64,
410        events: events.max(0) as u64,
411        tickets_cleaned: 0,
412    })
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use crate::dto::CreateTaskPayload;
419    use crate::task_ops::create_task_impl;
420    use crate::test_utils::TestState;
421
422    fn tmp_db_path() -> (std::path::PathBuf, std::path::PathBuf) {
423        let dir = std::env::temp_dir().join(format!("db-test-{}", uuid::Uuid::new_v4()));
424        std::fs::create_dir_all(&dir).expect("create tmp dir");
425        let db_path = dir.join("test.db");
426        (dir, db_path)
427    }
428
429    // ── open_conn ──
430
431    #[test]
432    fn open_conn_creates_connection() {
433        let (_dir, db_path) = tmp_db_path();
434        init_schema(&db_path).expect("init_schema");
435
436        let conn = open_conn(&db_path).expect("open_conn");
437        // Verify foreign keys are enabled
438        let fk: i64 = conn
439            .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
440            .expect("pragma");
441        assert_eq!(fk, 1);
442    }
443
444    // ── init_schema ──
445
446    #[test]
447    fn init_schema_creates_tables() {
448        let (_dir, db_path) = tmp_db_path();
449        init_schema(&db_path).expect("init_schema");
450
451        let conn = open_conn(&db_path).expect("open_conn");
452        let tables: Vec<String> = {
453            let mut stmt = conn
454                .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
455                .expect("prepare");
456            stmt.query_map([], |row| row.get(0))
457                .expect("query")
458                .collect::<std::result::Result<Vec<_>, _>>()
459                .expect("collect")
460        };
461
462        assert!(tables.contains(&"tasks".to_string()));
463        assert!(tables.contains(&"task_items".to_string()));
464        assert!(tables.contains(&"command_runs".to_string()));
465        assert!(tables.contains(&"events".to_string()));
466        assert!(tables.contains(&"agent_sessions".to_string()));
467        assert!(tables.contains(&"session_attachments".to_string()));
468    }
469
470    #[test]
471    fn init_schema_is_idempotent() {
472        let (_dir, db_path) = tmp_db_path();
473        init_schema(&db_path).expect("first init");
474        init_schema(&db_path).expect("second init should succeed");
475    }
476
477    // ── non-terminal task reference counts ──
478
479    #[test]
480    fn count_non_terminal_tasks_by_workspace_returns_zero_initially() {
481        let (_dir, db_path) = tmp_db_path();
482        init_schema(&db_path).expect("init_schema");
483
484        let conn = open_conn(&db_path).expect("open_conn");
485        let count =
486            count_non_terminal_tasks_by_workspace(&conn, "default", "nonexistent").expect("count");
487        assert_eq!(count, 0);
488    }
489
490    #[test]
491    fn count_non_terminal_tasks_by_workspace_counts_correctly() {
492        let mut fixture = TestState::new();
493        let state = fixture.build();
494
495        let qa_file = state
496            .data_dir
497            .join("workspace/default/docs/qa/count_ws_test.md");
498        std::fs::write(&qa_file, "# count ws test\n").expect("seed qa file");
499
500        create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
501        create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
502
503        let conn = open_conn(&state.db_path).expect("open sqlite");
504        let count = count_non_terminal_tasks_by_workspace(
505            &conn,
506            crate::config::DEFAULT_PROJECT_ID,
507            "default",
508        )
509        .expect("count");
510        assert_eq!(count, 2);
511    }
512
513    #[test]
514    fn count_non_terminal_tasks_by_workflow_returns_zero_initially() {
515        let (_dir, db_path) = tmp_db_path();
516        init_schema(&db_path).expect("init_schema");
517
518        let conn = open_conn(&db_path).expect("open_conn");
519        let count =
520            count_non_terminal_tasks_by_workflow(&conn, "default", "nonexistent").expect("count");
521        assert_eq!(count, 0);
522    }
523
524    #[test]
525    fn count_non_terminal_tasks_by_workflow_counts_correctly() {
526        let mut fixture = TestState::new();
527        let state = fixture.build();
528
529        let qa_file = state
530            .data_dir
531            .join("workspace/default/docs/qa/count_wf_test.md");
532        std::fs::write(&qa_file, "# count wf test\n").expect("seed qa file");
533
534        create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
535
536        let conn = open_conn(&state.db_path).expect("open sqlite");
537        let count =
538            count_non_terminal_tasks_by_workflow(&conn, crate::config::DEFAULT_PROJECT_ID, "basic")
539                .expect("count");
540        assert_eq!(count, 1);
541    }
542
543    // ── insert_task_execution_metric ──
544
545    #[test]
546    fn insert_task_execution_metric_stores_row() {
547        let (_dir, db_path) = tmp_db_path();
548        init_schema(&db_path).expect("init_schema");
549
550        let metric = TaskExecutionMetric {
551            task_id: "task-123".to_string(),
552            status: "running".to_string(),
553            current_cycle: 2,
554            unresolved_items: 3,
555            total_items: 10,
556            failed_items: 1,
557            command_runs: 5,
558            created_at: "2026-01-01T00:00:00Z".to_string(),
559        };
560        insert_task_execution_metric(&db_path, &metric).expect("insert metric");
561
562        let conn = open_conn(&db_path).expect("open sqlite");
563        let (tid, status, cycle, unresolved, total, failed, runs): (
564            String, String, i64, i64, i64, i64, i64,
565        ) = conn
566            .query_row(
567                "SELECT task_id, status, current_cycle, unresolved_items, total_items, failed_items, command_runs FROM task_execution_metrics WHERE task_id = ?1",
568                params!["task-123"],
569                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?, row.get(6)?)),
570            )
571            .expect("query metric");
572
573        assert_eq!(tid, "task-123");
574        assert_eq!(status, "running");
575        assert_eq!(cycle, 2);
576        assert_eq!(unresolved, 3);
577        assert_eq!(total, 10);
578        assert_eq!(failed, 1);
579        assert_eq!(runs, 5);
580    }
581
582    // ── reset_db ──
583
584    #[test]
585    fn reset_db_clears_data() {
586        let mut fixture = TestState::new();
587        let state = fixture.build();
588
589        let qa_file = state
590            .data_dir
591            .join("workspace/default/docs/qa/reset_test.md");
592        std::fs::write(&qa_file, "# reset test\n").expect("seed qa file");
593
594        create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
595
596        // Confirm task exists
597        let conn = open_conn(&state.db_path).expect("open sqlite");
598        let before: i64 = conn
599            .query_row("SELECT COUNT(*) FROM tasks", [], |row| row.get(0))
600            .expect("count before");
601        assert!(before > 0);
602        drop(conn);
603
604        reset_db(&state, false, false).expect("reset_db");
605
606        let conn = open_conn(&state.db_path).expect("open sqlite");
607        let after: i64 = conn
608            .query_row("SELECT COUNT(*) FROM tasks", [], |row| row.get(0))
609            .expect("count after");
610        assert_eq!(after, 0);
611    }
612
613    #[test]
614    fn reset_db_with_config_clears_config() {
615        let mut fixture = TestState::new();
616        let state = fixture.build();
617
618        // Confirm config versions exist in the active config history table.
619        let conn = open_conn(&state.db_path).expect("open sqlite");
620        let versions_before: i64 = conn
621            .query_row(
622                "SELECT COUNT(*) FROM orchestrator_config_versions",
623                [],
624                |row| row.get(0),
625            )
626            .expect("count config versions before");
627        assert!(versions_before > 0);
628        drop(conn);
629
630        reset_db(&state, false, true).expect("reset_db with config");
631
632        let conn = open_conn(&state.db_path).expect("open sqlite");
633        let versions_after: i64 = conn
634            .query_row(
635                "SELECT COUNT(*) FROM orchestrator_config_versions",
636                [],
637                |row| row.get(0),
638            )
639            .expect("count config versions after");
640        assert_eq!(versions_after, 0);
641    }
642
643    #[test]
644    fn reset_db_blocked_when_running_task_exists() {
645        let mut fixture = TestState::new();
646        let state = fixture.build();
647
648        let qa_file = state
649            .data_dir
650            .join("workspace/default/docs/qa/guard_test.md");
651        std::fs::write(&qa_file, "# guard test\n").expect("seed qa file");
652
653        let task = create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
654
655        // Simulate running status
656        let conn = open_conn(&state.db_path).expect("open sqlite");
657        conn.execute(
658            "UPDATE tasks SET status = 'running' WHERE id = ?1",
659            params![task.id],
660        )
661        .expect("set task running");
662        drop(conn);
663
664        let result = reset_db(&state, false, false);
665        assert!(result.is_err());
666        let err = result.expect_err("should be blocked").to_string();
667        assert!(
668            err.contains("db reset blocked"),
669            "unexpected error: {}",
670            err
671        );
672    }
673
674    #[test]
675    fn reset_db_blocked_when_restart_pending_task_exists() {
676        let mut fixture = TestState::new();
677        let state = fixture.build();
678
679        let qa_file = state
680            .data_dir
681            .join("workspace/default/docs/qa/restart_guard.md");
682        std::fs::write(&qa_file, "# restart guard\n").expect("seed qa file");
683
684        let task = create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
685
686        let conn = open_conn(&state.db_path).expect("open sqlite");
687        conn.execute(
688            "UPDATE tasks SET status = 'restart_pending' WHERE id = ?1",
689            params![task.id],
690        )
691        .expect("set task restart_pending");
692        drop(conn);
693
694        let result = reset_db(&state, false, false);
695        assert!(result.is_err());
696        assert!(
697            result
698                .expect_err("should be blocked")
699                .to_string()
700                .contains("db reset blocked")
701        );
702    }
703
704    // ── reset_project_data ──
705
706    #[test]
707    fn reset_project_data_returns_zero_stats_for_unknown_project() {
708        let mut fixture = TestState::new();
709        let state = fixture.build();
710
711        let stats = reset_project_data(&state, "nonexistent-project").expect("reset project data");
712        assert_eq!(
713            stats,
714            ProjectResetStats {
715                tasks: 0,
716                task_items: 0,
717                command_runs: 0,
718                events: 0,
719                tickets_cleaned: 0,
720            }
721        );
722    }
723
724    // ── ProjectResetStats ──
725
726    #[test]
727    fn project_reset_stats_debug_and_eq() {
728        let a = ProjectResetStats {
729            tasks: 1,
730            task_items: 2,
731            command_runs: 3,
732            events: 4,
733            tickets_cleaned: 0,
734        };
735        let b = a;
736        assert_eq!(a, b);
737        // Debug should work
738        let _debug = format!("{:?}", a);
739    }
740
741    // ── list_non_terminal_tasks_by_workspace ──
742
743    #[test]
744    fn list_non_terminal_tasks_by_workspace_empty() {
745        let (_dir, db_path) = tmp_db_path();
746        init_schema(&db_path).expect("init_schema");
747
748        let conn = open_conn(&db_path).expect("open_conn");
749        let tasks =
750            list_non_terminal_tasks_by_workspace(&conn, "default", "ws1", 10).expect("list empty");
751        assert!(tasks.is_empty());
752    }
753
754    #[test]
755    fn list_non_terminal_tasks_by_workspace_returns_matching() {
756        let mut fixture = TestState::new();
757        let state = fixture.build();
758
759        let qa_file = state
760            .data_dir
761            .join("workspace/default/docs/qa/list_ws_test.md");
762        std::fs::write(&qa_file, "# list ws test\n").expect("seed qa file");
763
764        create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
765        create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
766
767        let conn = open_conn(&state.db_path).expect("open sqlite");
768        let tasks = list_non_terminal_tasks_by_workspace(
769            &conn,
770            crate::config::DEFAULT_PROJECT_ID,
771            "default",
772            10,
773        )
774        .expect("list");
775        assert_eq!(tasks.len(), 2);
776        assert_eq!(tasks[0].status, "created");
777        assert_eq!(tasks[1].status, "created");
778    }
779
780    #[test]
781    fn list_non_terminal_tasks_by_workspace_respects_limit() {
782        let mut fixture = TestState::new();
783        let state = fixture.build();
784
785        let qa_file = state
786            .data_dir
787            .join("workspace/default/docs/qa/limit_ws_test.md");
788        std::fs::write(&qa_file, "# limit ws test\n").expect("seed qa file");
789
790        create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
791        create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
792        create_task_impl(&state, CreateTaskPayload::default()).expect("task 3");
793
794        let conn = open_conn(&state.db_path).expect("open sqlite");
795        let tasks = list_non_terminal_tasks_by_workspace(
796            &conn,
797            crate::config::DEFAULT_PROJECT_ID,
798            "default",
799            2,
800        )
801        .expect("list limited");
802        assert_eq!(tasks.len(), 2);
803    }
804
805    #[test]
806    fn list_non_terminal_tasks_by_workspace_excludes_terminal() {
807        let mut fixture = TestState::new();
808        let state = fixture.build();
809
810        let qa_file = state
811            .data_dir
812            .join("workspace/default/docs/qa/terminal_ws_test.md");
813        std::fs::write(&qa_file, "# terminal ws test\n").expect("seed qa file");
814
815        let task = create_task_impl(&state, CreateTaskPayload::default()).expect("task");
816
817        // Mark as completed (terminal)
818        let conn = open_conn(&state.db_path).expect("open sqlite");
819        conn.execute(
820            "UPDATE tasks SET status = 'completed' WHERE id = ?1",
821            params![task.id],
822        )
823        .expect("set task completed");
824
825        let tasks = list_non_terminal_tasks_by_workspace(
826            &conn,
827            crate::config::DEFAULT_PROJECT_ID,
828            "default",
829            10,
830        )
831        .expect("list");
832        assert!(tasks.is_empty());
833    }
834
835    // ── list_non_terminal_tasks_by_workflow ──
836
837    #[test]
838    fn list_non_terminal_tasks_by_workflow_empty() {
839        let (_dir, db_path) = tmp_db_path();
840        init_schema(&db_path).expect("init_schema");
841
842        let conn = open_conn(&db_path).expect("open_conn");
843        let tasks =
844            list_non_terminal_tasks_by_workflow(&conn, "default", "wf1", 10).expect("list empty");
845        assert!(tasks.is_empty());
846    }
847
848    #[test]
849    fn list_non_terminal_tasks_by_workflow_returns_matching() {
850        let mut fixture = TestState::new();
851        let state = fixture.build();
852
853        let qa_file = state
854            .data_dir
855            .join("workspace/default/docs/qa/list_wf_test.md");
856        std::fs::write(&qa_file, "# list wf test\n").expect("seed qa file");
857
858        create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
859
860        let conn = open_conn(&state.db_path).expect("open sqlite");
861        let tasks = list_non_terminal_tasks_by_workflow(
862            &conn,
863            crate::config::DEFAULT_PROJECT_ID,
864            "basic",
865            10,
866        )
867        .expect("list");
868        assert_eq!(tasks.len(), 1);
869        assert_eq!(tasks[0].status, "created");
870    }
871
872    #[test]
873    fn list_non_terminal_tasks_by_workflow_respects_limit() {
874        let mut fixture = TestState::new();
875        let state = fixture.build();
876
877        let qa_file = state
878            .data_dir
879            .join("workspace/default/docs/qa/limit_wf_test.md");
880        std::fs::write(&qa_file, "# limit wf test\n").expect("seed qa file");
881
882        create_task_impl(&state, CreateTaskPayload::default()).expect("task 1");
883        create_task_impl(&state, CreateTaskPayload::default()).expect("task 2");
884        create_task_impl(&state, CreateTaskPayload::default()).expect("task 3");
885
886        let conn = open_conn(&state.db_path).expect("open sqlite");
887        let tasks = list_non_terminal_tasks_by_workflow(
888            &conn,
889            crate::config::DEFAULT_PROJECT_ID,
890            "basic",
891            1,
892        )
893        .expect("list limited");
894        assert_eq!(tasks.len(), 1);
895    }
896
897    // ── insert_control_plane_audit ──
898
899    #[test]
900    fn insert_control_plane_audit_stores_row() {
901        let (_dir, db_path) = tmp_db_path();
902        init_schema(&db_path).expect("init_schema");
903
904        let record = ControlPlaneAuditRecord {
905            transport: "grpc".to_string(),
906            remote_addr: Some("127.0.0.1:5000".to_string()),
907            rpc: "CreateTask".to_string(),
908            subject_id: Some("user-1".to_string()),
909            authn_result: "ok".to_string(),
910            authz_result: "allowed".to_string(),
911            role: Some("admin".to_string()),
912            reason: Some("normal access".to_string()),
913            tls_fingerprint: None,
914            rejection_stage: None,
915            traffic_class: None,
916            limit_scope: None,
917            decision: None,
918            reason_code: None,
919        };
920        insert_control_plane_audit(&db_path, &record).expect("insert audit");
921
922        let conn = open_conn(&db_path).expect("open sqlite");
923        let (transport, rpc, authn, authz): (String, String, String, String) = conn
924            .query_row(
925                "SELECT transport, rpc, authn_result, authz_result FROM control_plane_audit LIMIT 1",
926                [],
927                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
928            )
929            .expect("query audit");
930        assert_eq!(transport, "grpc");
931        assert_eq!(rpc, "CreateTask");
932        assert_eq!(authn, "ok");
933        assert_eq!(authz, "allowed");
934    }
935
936    #[test]
937    fn insert_control_plane_audit_with_none_fields() {
938        let (_dir, db_path) = tmp_db_path();
939        init_schema(&db_path).expect("init_schema");
940
941        let record = ControlPlaneAuditRecord {
942            transport: "uds".to_string(),
943            remote_addr: None,
944            rpc: "ListTasks".to_string(),
945            subject_id: None,
946            authn_result: "skipped".to_string(),
947            authz_result: "skipped".to_string(),
948            role: None,
949            reason: None,
950            tls_fingerprint: None,
951            rejection_stage: None,
952            traffic_class: None,
953            limit_scope: None,
954            decision: None,
955            reason_code: None,
956        };
957        insert_control_plane_audit(&db_path, &record).expect("insert audit with nones");
958
959        let conn = open_conn(&db_path).expect("open sqlite");
960        let count: i64 = conn
961            .query_row("SELECT COUNT(*) FROM control_plane_audit", [], |row| {
962                row.get(0)
963            })
964            .expect("count audit");
965        assert_eq!(count, 1);
966    }
967
968    // ── reset_db include_history branch ──
969
970    #[test]
971    fn reset_db_with_history_keeps_latest_config_version() {
972        let mut fixture = TestState::new();
973        let state = fixture.build();
974
975        // Confirm config versions exist
976        let conn = open_conn(&state.db_path).expect("open sqlite");
977        let versions_before: i64 = conn
978            .query_row(
979                "SELECT COUNT(*) FROM orchestrator_config_versions",
980                [],
981                |row| row.get(0),
982            )
983            .expect("count config versions before");
984        assert!(versions_before > 0);
985        drop(conn);
986
987        // Reset with include_history=true, include_config=false
988        // Should keep only the latest config version
989        reset_db(&state, true, false).expect("reset_db with history");
990
991        let conn = open_conn(&state.db_path).expect("open sqlite");
992        let versions_after: i64 = conn
993            .query_row(
994                "SELECT COUNT(*) FROM orchestrator_config_versions",
995                [],
996                |row| row.get(0),
997            )
998            .expect("count config versions after");
999        // Should keep at most 1 (the latest)
1000        assert!(versions_after <= 1, "Expected <= 1, got {}", versions_after);
1001        // Tasks should be cleared
1002        let tasks: i64 = conn
1003            .query_row("SELECT COUNT(*) FROM tasks", [], |row| row.get(0))
1004            .expect("count tasks");
1005        assert_eq!(tasks, 0);
1006    }
1007
1008    // ── reset_project_data with actual data ──
1009
1010    #[test]
1011    fn reset_project_data_clears_project_data_and_returns_stats() {
1012        let mut fixture = TestState::new();
1013        let state = fixture.build();
1014
1015        let qa_file = state
1016            .data_dir
1017            .join("workspace/default/docs/qa/proj_reset_test.md");
1018        std::fs::write(&qa_file, "# proj reset test\n").expect("seed qa file");
1019
1020        let task = create_task_impl(&state, CreateTaskPayload::default()).expect("create task");
1021
1022        // Verify task exists
1023        let conn = open_conn(&state.db_path).expect("open sqlite");
1024        let task_count: i64 = conn
1025            .query_row(
1026                "SELECT COUNT(*) FROM tasks WHERE project_id = ?1",
1027                params![crate::config::DEFAULT_PROJECT_ID],
1028                |row| row.get(0),
1029            )
1030            .expect("count tasks");
1031        assert!(task_count > 0);
1032
1033        // Insert an event for the task
1034        conn.execute(
1035            "INSERT INTO events (task_id, event_type, payload_json, created_at) VALUES (?1, ?2, ?3, ?4)",
1036            params![task.id, "test", "{}", crate::config_load::now_ts()],
1037        )
1038        .expect("insert event");
1039        drop(conn);
1040
1041        let stats =
1042            reset_project_data(&state, crate::config::DEFAULT_PROJECT_ID).expect("reset project");
1043        assert!(stats.tasks > 0, "expected tasks > 0, got {}", stats.tasks);
1044        assert_eq!(stats.tickets_cleaned, 0); // hardcoded to 0
1045
1046        // Verify data is cleared
1047        let conn = open_conn(&state.db_path).expect("open sqlite after reset");
1048        let task_count_after: i64 = conn
1049            .query_row(
1050                "SELECT COUNT(*) FROM tasks WHERE project_id = ?1",
1051                params![crate::config::DEFAULT_PROJECT_ID],
1052                |row| row.get(0),
1053            )
1054            .expect("count tasks after");
1055        assert_eq!(task_count_after, 0);
1056
1057        let event_count_after: i64 = conn
1058            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
1059            .expect("count events after");
1060        assert_eq!(event_count_after, 0);
1061    }
1062
1063    // ── count excludes terminal statuses ──
1064
1065    #[test]
1066    fn count_non_terminal_tasks_by_workspace_excludes_completed() {
1067        let mut fixture = TestState::new();
1068        let state = fixture.build();
1069
1070        let qa_file = state
1071            .data_dir
1072            .join("workspace/default/docs/qa/terminal_count_test.md");
1073        std::fs::write(&qa_file, "# terminal count test\n").expect("seed qa file");
1074
1075        let task = create_task_impl(&state, CreateTaskPayload::default()).expect("task");
1076
1077        let conn = open_conn(&state.db_path).expect("open sqlite");
1078        conn.execute(
1079            "UPDATE tasks SET status = 'completed' WHERE id = ?1",
1080            params![task.id],
1081        )
1082        .expect("set completed");
1083
1084        let count = count_non_terminal_tasks_by_workspace(
1085            &conn,
1086            crate::config::DEFAULT_PROJECT_ID,
1087            "default",
1088        )
1089        .expect("count");
1090        assert_eq!(count, 0);
1091    }
1092}