Skip to main content

chainlink/db/
mod.rs

1mod archive;
2mod comments;
3mod dependencies;
4mod issues;
5mod labels;
6mod milestones;
7mod relations;
8mod sessions;
9mod time_entries;
10mod token_usage_db;
11
12use anyhow::{Context, Result};
13use chrono::{DateTime, Utc};
14use rusqlite::Connection;
15use std::path::Path;
16
17use crate::models::Issue;
18
19const SCHEMA_VERSION: i32 = 12;
20
21/// Valid values for issue priority.
22pub const VALID_PRIORITIES: &[&str] = &["low", "medium", "high", "critical"];
23
24/// Valid values for issue status.
25pub const VALID_STATUSES: &[&str] = &["open", "closed", "archived"];
26
27/// Maximum lengths for string inputs.
28pub const MAX_TITLE_LEN: usize = 512;
29pub const MAX_LABEL_LEN: usize = 128;
30pub const MAX_DESCRIPTION_LEN: usize = 64 * 1024; // 64KB
31pub const MAX_COMMENT_LEN: usize = 1024 * 1024; // 1MB
32
33/// Validate that a status value is known, returning an error if not.
34pub fn validate_status(status: &str) -> Result<()> {
35    if VALID_STATUSES.contains(&status) {
36        Ok(())
37    } else {
38        anyhow::bail!(
39            "Invalid status '{}'. Valid values: {}",
40            status,
41            VALID_STATUSES.join(", ")
42        )
43    }
44}
45
46/// Validate that a priority value is known, returning an error if not.
47pub fn validate_priority(priority: &str) -> Result<()> {
48    if VALID_PRIORITIES.contains(&priority) {
49        Ok(())
50    } else {
51        anyhow::bail!(
52            "Invalid priority '{}'. Valid values: {}",
53            priority,
54            VALID_PRIORITIES.join(", ")
55        )
56    }
57}
58
59pub struct Database {
60    pub(crate) conn: Connection,
61}
62
63impl Database {
64    pub fn open(path: &Path) -> Result<Self> {
65        let conn = Connection::open(path).context("Failed to open database")?;
66        let db = Database { conn };
67        db.init_schema()?;
68        Ok(db)
69    }
70
71    /// Execute a closure within a database transaction.
72    /// If the closure returns Ok, the transaction is committed.
73    /// If the closure returns Err, the transaction is rolled back.
74    pub fn transaction<T, F>(&self, f: F) -> Result<T>
75    where
76        F: FnOnce() -> Result<T>,
77    {
78        self.conn.execute("BEGIN TRANSACTION", [])?;
79        match f() {
80            Ok(result) => {
81                self.conn.execute("COMMIT", [])?;
82                Ok(result)
83            }
84            Err(e) => {
85                if let Err(rollback_err) = self.conn.execute("ROLLBACK", []) {
86                    tracing::warn!("ROLLBACK failed: {}", rollback_err);
87                }
88                Err(e)
89            }
90        }
91    }
92
93    /// Run a migration statement, logging unexpected errors.
94    /// Expected errors (duplicate column, table already exists) are logged at debug level.
95    fn migrate(&self, sql: &str) {
96        if let Err(e) = self.conn.execute(sql, []) {
97            let msg = e.to_string();
98            if msg.contains("duplicate column") || msg.contains("already exists") {
99                tracing::debug!("migration skipped (already applied): {}", msg);
100            } else {
101                tracing::warn!("migration error ({}): {}", sql.trim(), msg);
102            }
103        }
104    }
105
106    /// Run a batch migration statement, logging unexpected errors.
107    fn migrate_batch(&self, sql: &str) {
108        if let Err(e) = self.conn.execute_batch(sql) {
109            let msg = e.to_string();
110            if msg.contains("duplicate column") || msg.contains("already exists") {
111                tracing::debug!("migration batch skipped (already applied): {}", msg);
112            } else {
113                tracing::warn!("migration batch error: {}", msg);
114            }
115        }
116    }
117
118    fn init_schema(&self) -> Result<()> {
119        // Check if we need to initialize
120        let version: i32 = self
121            .conn
122            .query_row(
123                "SELECT COALESCE(MAX(user_version), 0) FROM pragma_user_version",
124                [],
125                |row| row.get(0),
126            )
127            .unwrap_or(0);
128
129        if version < SCHEMA_VERSION {
130            self.conn.execute_batch(
131                r#"
132                -- Core issues table
133                CREATE TABLE IF NOT EXISTS issues (
134                    id INTEGER PRIMARY KEY AUTOINCREMENT,
135                    title TEXT NOT NULL,
136                    description TEXT,
137                    status TEXT NOT NULL DEFAULT 'open',
138                    priority TEXT NOT NULL DEFAULT 'medium',
139                    parent_id INTEGER,
140                    created_at TEXT NOT NULL,
141                    updated_at TEXT NOT NULL,
142                    closed_at TEXT,
143                    FOREIGN KEY (parent_id) REFERENCES issues(id) ON DELETE CASCADE
144                );
145
146                -- Labels (many-to-many)
147                CREATE TABLE IF NOT EXISTS labels (
148                    issue_id INTEGER NOT NULL,
149                    label TEXT NOT NULL,
150                    PRIMARY KEY (issue_id, label),
151                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
152                );
153
154                -- Dependencies (blocker blocks blocked)
155                CREATE TABLE IF NOT EXISTS dependencies (
156                    blocker_id INTEGER NOT NULL,
157                    blocked_id INTEGER NOT NULL,
158                    PRIMARY KEY (blocker_id, blocked_id),
159                    FOREIGN KEY (blocker_id) REFERENCES issues(id) ON DELETE CASCADE,
160                    FOREIGN KEY (blocked_id) REFERENCES issues(id) ON DELETE CASCADE
161                );
162
163                -- Comments
164                CREATE TABLE IF NOT EXISTS comments (
165                    id INTEGER PRIMARY KEY AUTOINCREMENT,
166                    issue_id INTEGER NOT NULL,
167                    content TEXT NOT NULL,
168                    created_at TEXT NOT NULL,
169                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
170                );
171
172                -- Sessions (for context preservation)
173                CREATE TABLE IF NOT EXISTS sessions (
174                    id INTEGER PRIMARY KEY AUTOINCREMENT,
175                    started_at TEXT NOT NULL,
176                    ended_at TEXT,
177                    active_issue_id INTEGER,
178                    handoff_notes TEXT,
179                    FOREIGN KEY (active_issue_id) REFERENCES issues(id) ON DELETE SET NULL
180                );
181
182                -- Time tracking
183                CREATE TABLE IF NOT EXISTS time_entries (
184                    id INTEGER PRIMARY KEY AUTOINCREMENT,
185                    issue_id INTEGER NOT NULL,
186                    started_at TEXT NOT NULL,
187                    ended_at TEXT,
188                    duration_seconds INTEGER,
189                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
190                );
191
192                -- Relations (related issues, bidirectional)
193                CREATE TABLE IF NOT EXISTS relations (
194                    issue_id_1 INTEGER NOT NULL,
195                    issue_id_2 INTEGER NOT NULL,
196                    created_at TEXT NOT NULL,
197                    PRIMARY KEY (issue_id_1, issue_id_2),
198                    FOREIGN KEY (issue_id_1) REFERENCES issues(id) ON DELETE CASCADE,
199                    FOREIGN KEY (issue_id_2) REFERENCES issues(id) ON DELETE CASCADE
200                );
201
202                -- Milestones
203                CREATE TABLE IF NOT EXISTS milestones (
204                    id INTEGER PRIMARY KEY AUTOINCREMENT,
205                    name TEXT NOT NULL,
206                    description TEXT,
207                    status TEXT NOT NULL DEFAULT 'open',
208                    created_at TEXT NOT NULL,
209                    closed_at TEXT
210                );
211
212                -- Milestone-Issue relationship (many-to-many)
213                CREATE TABLE IF NOT EXISTS milestone_issues (
214                    milestone_id INTEGER NOT NULL,
215                    issue_id INTEGER NOT NULL,
216                    PRIMARY KEY (milestone_id, issue_id),
217                    FOREIGN KEY (milestone_id) REFERENCES milestones(id) ON DELETE CASCADE,
218                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
219                );
220
221                -- Indexes
222                CREATE INDEX IF NOT EXISTS idx_issues_status ON issues(status);
223                CREATE INDEX IF NOT EXISTS idx_issues_priority ON issues(priority);
224                CREATE INDEX IF NOT EXISTS idx_labels_issue ON labels(issue_id);
225                CREATE INDEX IF NOT EXISTS idx_comments_issue ON comments(issue_id);
226                CREATE INDEX IF NOT EXISTS idx_deps_blocker ON dependencies(blocker_id);
227                CREATE INDEX IF NOT EXISTS idx_deps_blocked ON dependencies(blocked_id);
228                CREATE INDEX IF NOT EXISTS idx_issues_parent ON issues(parent_id);
229                CREATE INDEX IF NOT EXISTS idx_time_entries_issue ON time_entries(issue_id);
230                CREATE INDEX IF NOT EXISTS idx_relations_1 ON relations(issue_id_1);
231                CREATE INDEX IF NOT EXISTS idx_relations_2 ON relations(issue_id_2);
232                CREATE INDEX IF NOT EXISTS idx_milestone_issues_m ON milestone_issues(milestone_id);
233                CREATE INDEX IF NOT EXISTS idx_milestone_issues_i ON milestone_issues(issue_id);
234                "#,
235            )?;
236
237            // Migration: add parent_id column if upgrading from v1
238            self.migrate(
239                "ALTER TABLE issues ADD COLUMN parent_id INTEGER REFERENCES issues(id) ON DELETE CASCADE",
240            );
241
242            // Migration v7: Recreate sessions table with ON DELETE SET NULL for active_issue_id
243            if version < 7 {
244                self.migrate_batch(
245                    r#"
246                    CREATE TABLE IF NOT EXISTS sessions_new (
247                        id INTEGER PRIMARY KEY AUTOINCREMENT,
248                        started_at TEXT NOT NULL,
249                        ended_at TEXT,
250                        active_issue_id INTEGER,
251                        handoff_notes TEXT,
252                        FOREIGN KEY (active_issue_id) REFERENCES issues(id) ON DELETE SET NULL
253                    );
254                    INSERT OR IGNORE INTO sessions_new SELECT * FROM sessions;
255                    DROP TABLE IF EXISTS sessions;
256                    ALTER TABLE sessions_new RENAME TO sessions;
257                    "#,
258                );
259            }
260
261            // Migration v8: Add last_action column to sessions table
262            if version < 8 {
263                self.migrate("ALTER TABLE sessions ADD COLUMN last_action TEXT");
264            }
265
266            // Migration v9: Drop leftover sessions_new table from a bug where
267            // user_version was always read as 0 (wrong column name in the query),
268            // causing the v7 migration to re-run on every open and leave behind
269            // a stale sessions_new table.
270            if version < 9 {
271                self.migrate("DROP TABLE IF EXISTS sessions_new");
272            }
273
274            // Migration v10: Add kind column to comments for typed audit trail
275            if version < 10 {
276                self.migrate("ALTER TABLE comments ADD COLUMN kind TEXT DEFAULT 'note'");
277            }
278
279            // Migration v11: Token usage tracking table
280            if version < 11 {
281                self.migrate_batch(
282                    r#"
283                    CREATE TABLE IF NOT EXISTS token_usage (
284                        id INTEGER PRIMARY KEY AUTOINCREMENT,
285                        agent_id TEXT NOT NULL,
286                        session_id INTEGER,
287                        timestamp TEXT NOT NULL,
288                        input_tokens INTEGER NOT NULL DEFAULT 0,
289                        output_tokens INTEGER NOT NULL DEFAULT 0,
290                        cache_read_tokens INTEGER,
291                        cache_creation_tokens INTEGER,
292                        model TEXT NOT NULL DEFAULT 'unknown',
293                        cost_estimate REAL,
294                        FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE SET NULL
295                    );
296                    CREATE INDEX IF NOT EXISTS idx_token_usage_agent ON token_usage(agent_id);
297                    CREATE INDEX IF NOT EXISTS idx_token_usage_session ON token_usage(session_id);
298                    CREATE INDEX IF NOT EXISTS idx_token_usage_timestamp ON token_usage(timestamp);
299                    "#,
300                );
301            }
302
303            // Migration v12: Add agent_id column to sessions for multi-agent tracking
304            if version < 12 {
305                self.migrate("ALTER TABLE sessions ADD COLUMN agent_id TEXT");
306            }
307
308            self.conn
309                .execute(&format!("PRAGMA user_version = {}", SCHEMA_VERSION), [])?;
310        }
311
312        // Enable foreign keys
313        self.conn.execute("PRAGMA foreign_keys = ON", [])?;
314
315        Ok(())
316    }
317}
318
319pub(crate) fn parse_datetime(s: String) -> DateTime<Utc> {
320    DateTime::parse_from_rfc3339(&s)
321        .map(|dt| dt.with_timezone(&Utc))
322        .unwrap_or_else(|_| Utc::now())
323}
324
325/// Maps a database row to an Issue struct.
326/// Expects columns in order: id, title, description, status, priority, parent_id, created_at, updated_at, closed_at
327pub(crate) fn issue_from_row(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
328    Ok(Issue {
329        id: row.get(0)?,
330        title: row.get(1)?,
331        description: row.get(2)?,
332        status: row.get(3)?,
333        priority: row.get(4)?,
334        parent_id: row.get(5)?,
335        created_at: parse_datetime(row.get::<_, String>(6)?),
336        updated_at: parse_datetime(row.get::<_, String>(7)?),
337        closed_at: row.get::<_, Option<String>>(8)?.map(parse_datetime),
338    })
339}
340
341#[cfg(test)]
342mod tests;
343
344#[cfg(test)]
345mod proptest_tests;