crosslink 0.8.0

A synced issue tracker CLI for multi-agent AI development
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
use anyhow::{Context, Result};
use rusqlite::Connection;
use std::path::Path;

pub const SCHEMA_VERSION: i32 = 16;

/// Valid values for issue priority.
pub const VALID_PRIORITIES: &[&str] = &["low", "medium", "high", "critical"];

/// Valid values for issue status.
pub const VALID_STATUSES: &[&str] = &["open", "closed", "archived"];

/// Maximum lengths for string inputs.
pub const MAX_TITLE_LEN: usize = 512;
pub const MAX_LABEL_LEN: usize = 128;
pub const MAX_DESCRIPTION_LEN: usize = 64 * 1024; // 64KB
pub const MAX_COMMENT_LEN: usize = 1024 * 1024; // 1MB

/// Validate that a status value is known, returning an error if not.
///
/// # Errors
///
/// Returns an error if the status is not one of the valid values.
pub fn validate_status(status: &str) -> Result<()> {
    if VALID_STATUSES.contains(&status) {
        Ok(())
    } else {
        anyhow::bail!(
            "Invalid status '{}'. Valid values: {}",
            status,
            VALID_STATUSES.join(", ")
        )
    }
}

/// Validate that a priority value is known, returning an error if not.
///
/// # Errors
///
/// Returns an error if the priority is not one of the valid values.
pub fn validate_priority(priority: &str) -> Result<()> {
    if VALID_PRIORITIES.contains(&priority) {
        Ok(())
    } else {
        anyhow::bail!(
            "Invalid priority '{}'. Valid values: {}",
            priority,
            VALID_PRIORITIES.join(", ")
        )
    }
}

pub struct Database {
    pub(crate) conn: Connection,
}

impl Database {
    /// Open a database at the given path, initializing the schema if needed.
    ///
    /// # Errors
    /// Returns an error if the database cannot be opened or schema initialization fails.
    pub fn open(path: &Path) -> Result<Self> {
        let conn = Connection::open(path).context("Failed to open database")?;
        let db = Self { conn };
        db.init_schema()?;
        Ok(db)
    }

    /// Execute a closure within a database transaction.
    /// If the closure returns Ok, the transaction is committed.
    /// If the closure returns Err or the closure panics, the transaction is
    /// rolled back automatically via rusqlite's RAII `Transaction` type.
    ///
    /// # Errors
    /// Returns an error if the transaction cannot be started, committed, or if the closure fails.
    pub fn transaction<T, F>(&self, f: F) -> Result<T>
    where
        F: FnOnce() -> Result<T>,
    {
        let tx = self.conn.unchecked_transaction()?;
        let result = f()?;
        tx.commit()?;
        Ok(result)
    }

    /// Toggle `SQLite` foreign key enforcement.
    ///
    /// Must be called outside a transaction (`PRAGMA foreign_keys` is a
    /// no-op inside one). Used by hydration to prevent `ON DELETE` cascades
    /// during bulk clear/reinsert (#461).
    ///
    /// # Errors
    /// Returns an error if the pragma execution fails.
    pub fn set_foreign_keys(&self, enabled: bool) -> Result<()> {
        let value = if enabled { "ON" } else { "OFF" };
        self.conn
            .execute_batch(&format!("PRAGMA foreign_keys = {value};"))?;
        Ok(())
    }

    /// Run a migration statement, logging unexpected errors.
    /// Expected errors (duplicate column, table already exists) are logged at debug level.
    fn migrate(&self, sql: &str) {
        if let Err(e) = self.conn.execute(sql, []) {
            let msg = e.to_string();
            if msg.contains("duplicate column") || msg.contains("already exists") {
                tracing::debug!(
                    "migration skipped (already applied): {}: {}",
                    sql.trim(),
                    msg
                );
            } else {
                tracing::warn!("migration error ({}): {}", sql.trim(), msg);
            }
        }
    }

    /// Run a batch migration statement, logging unexpected errors.
    /// Expected errors (duplicate column, table already exists) are logged at debug level.
    fn migrate_batch(&self, sql: &str) {
        if let Err(e) = self.conn.execute_batch(sql) {
            let msg = e.to_string();
            if msg.contains("duplicate column") || msg.contains("already exists") {
                tracing::debug!("migration batch skipped (already applied): {}", msg);
            } else {
                tracing::warn!("migration batch error: {}", msg);
            }
        }
    }

    fn init_schema(&self) -> Result<()> {
        // Check if we need to initialize
        let version: i32 = self
            .conn
            .query_row(
                "SELECT COALESCE(MAX(user_version), 0) FROM pragma_user_version",
                [],
                |row| row.get(0),
            )
            .unwrap_or_else(|e| {
                tracing::warn!(
                    "failed to read schema version (PRAGMA user_version): {e}, defaulting to 0"
                );
                0
            });

        if version < SCHEMA_VERSION {
            self.create_tables()?;
            self.run_migrations(version);

            self.conn
                .execute(&format!("PRAGMA user_version = {SCHEMA_VERSION}"), [])?;
        }

        // Enable foreign keys
        self.conn.execute("PRAGMA foreign_keys = ON", [])?;

        Ok(())
    }

    fn create_tables(&self) -> Result<()> {
        self.conn.execute_batch(
            r"
                -- Core issues table
                CREATE TABLE IF NOT EXISTS issues (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title TEXT NOT NULL,
                    description TEXT,
                    status TEXT NOT NULL DEFAULT 'open',
                    priority TEXT NOT NULL DEFAULT 'medium',
                    parent_id INTEGER,
                    created_at TEXT NOT NULL,
                    updated_at TEXT NOT NULL,
                    closed_at TEXT,
                    FOREIGN KEY (parent_id) REFERENCES issues(id) ON DELETE CASCADE
                );

                -- Labels (many-to-many)
                CREATE TABLE IF NOT EXISTS labels (
                    issue_id INTEGER NOT NULL,
                    label TEXT NOT NULL,
                    PRIMARY KEY (issue_id, label),
                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
                );

                -- Dependencies (blocker blocks blocked)
                CREATE TABLE IF NOT EXISTS dependencies (
                    blocker_id INTEGER NOT NULL,
                    blocked_id INTEGER NOT NULL,
                    PRIMARY KEY (blocker_id, blocked_id),
                    FOREIGN KEY (blocker_id) REFERENCES issues(id) ON DELETE CASCADE,
                    FOREIGN KEY (blocked_id) REFERENCES issues(id) ON DELETE CASCADE
                );

                -- Comments
                CREATE TABLE IF NOT EXISTS comments (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    issue_id INTEGER NOT NULL,
                    content TEXT NOT NULL,
                    created_at TEXT NOT NULL,
                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
                );

                -- Sessions (for context preservation)
                CREATE TABLE IF NOT EXISTS sessions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    started_at TEXT NOT NULL,
                    ended_at TEXT,
                    active_issue_id INTEGER,
                    handoff_notes TEXT,
                    FOREIGN KEY (active_issue_id) REFERENCES issues(id) ON DELETE SET NULL
                );

                -- Time tracking
                CREATE TABLE IF NOT EXISTS time_entries (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    issue_id INTEGER NOT NULL,
                    started_at TEXT NOT NULL,
                    ended_at TEXT,
                    duration_seconds INTEGER,
                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
                );

                -- Relations (related issues, bidirectional)
                CREATE TABLE IF NOT EXISTS relations (
                    issue_id_1 INTEGER NOT NULL,
                    issue_id_2 INTEGER NOT NULL,
                    created_at TEXT NOT NULL,
                    PRIMARY KEY (issue_id_1, issue_id_2),
                    FOREIGN KEY (issue_id_1) REFERENCES issues(id) ON DELETE CASCADE,
                    FOREIGN KEY (issue_id_2) REFERENCES issues(id) ON DELETE CASCADE
                );

                -- Milestones
                CREATE TABLE IF NOT EXISTS milestones (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    description TEXT,
                    status TEXT NOT NULL DEFAULT 'open',
                    created_at TEXT NOT NULL,
                    closed_at TEXT
                );

                -- Milestone-Issue relationship (many-to-many)
                CREATE TABLE IF NOT EXISTS milestone_issues (
                    milestone_id INTEGER NOT NULL,
                    issue_id INTEGER NOT NULL,
                    PRIMARY KEY (milestone_id, issue_id),
                    FOREIGN KEY (milestone_id) REFERENCES milestones(id) ON DELETE CASCADE,
                    FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
                );

                -- Indexes
                CREATE INDEX IF NOT EXISTS idx_issues_status ON issues(status);
                CREATE INDEX IF NOT EXISTS idx_issues_priority ON issues(priority);
                CREATE INDEX IF NOT EXISTS idx_labels_issue ON labels(issue_id);
                CREATE INDEX IF NOT EXISTS idx_comments_issue ON comments(issue_id);
                CREATE INDEX IF NOT EXISTS idx_deps_blocker ON dependencies(blocker_id);
                CREATE INDEX IF NOT EXISTS idx_deps_blocked ON dependencies(blocked_id);
                CREATE INDEX IF NOT EXISTS idx_issues_parent ON issues(parent_id);
                CREATE INDEX IF NOT EXISTS idx_time_entries_issue ON time_entries(issue_id);
                CREATE INDEX IF NOT EXISTS idx_relations_1 ON relations(issue_id_1);
                CREATE INDEX IF NOT EXISTS idx_relations_2 ON relations(issue_id_2);
                CREATE INDEX IF NOT EXISTS idx_milestone_issues_m ON milestone_issues(milestone_id);
                CREATE INDEX IF NOT EXISTS idx_milestone_issues_i ON milestone_issues(issue_id);
                ",
        )?;
        Ok(())
    }

    fn run_migrations(&self, version: i32) {
        // Migration: add parent_id column if upgrading from v1
        self.migrate(
            "ALTER TABLE issues ADD COLUMN parent_id INTEGER REFERENCES issues(id) ON DELETE CASCADE",
        );

        // Migration v7: Recreate sessions table with ON DELETE SET NULL for active_issue_id
        // This ensures deleting an issue clears the session reference instead of failing
        if version < 7 {
            self.migrate_batch(
                r"
                    DROP TABLE IF EXISTS sessions_new;
                    CREATE TABLE sessions_new (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        started_at TEXT NOT NULL,
                        ended_at TEXT,
                        active_issue_id INTEGER,
                        handoff_notes TEXT,
                        FOREIGN KEY (active_issue_id) REFERENCES issues(id) ON DELETE SET NULL
                    );
                    INSERT OR IGNORE INTO sessions_new (id, started_at, ended_at, active_issue_id, handoff_notes)
                        SELECT id, started_at, ended_at, active_issue_id, handoff_notes FROM sessions;
                    DROP TABLE IF EXISTS sessions;
                    ALTER TABLE sessions_new RENAME TO sessions;
                    ",
            );
        }

        // Migration v8: Add last_action column to sessions table
        if version < 8 {
            self.migrate("ALTER TABLE sessions ADD COLUMN last_action TEXT");
        }

        // Migration v9: Add agent_id column to sessions table
        if version < 9 {
            self.migrate("ALTER TABLE sessions ADD COLUMN agent_id TEXT");
        }

        // Migration v10: Add uuid columns for shared issue coordination
        if version < 10 {
            self.migrate("ALTER TABLE issues ADD COLUMN uuid TEXT");
            self.migrate("CREATE UNIQUE INDEX IF NOT EXISTS idx_issues_uuid ON issues(uuid)");
            self.migrate("ALTER TABLE issues ADD COLUMN created_by TEXT");
            self.migrate("ALTER TABLE comments ADD COLUMN uuid TEXT");
            self.migrate("ALTER TABLE comments ADD COLUMN author TEXT");
            self.migrate("ALTER TABLE milestones ADD COLUMN uuid TEXT");
            self.migrate(
                "CREATE UNIQUE INDEX IF NOT EXISTS idx_milestones_uuid ON milestones(uuid)",
            );
        }

        // Migration v11: Add kind column to comments for typed audit trail
        if version < 11 {
            self.migrate("ALTER TABLE comments ADD COLUMN kind TEXT DEFAULT 'note'");
        }

        // Migration v12: Add trigger_type and intervention_context for driver intervention tracking
        if version < 12 {
            self.migrate("ALTER TABLE comments ADD COLUMN trigger_type TEXT");
            self.migrate("ALTER TABLE comments ADD COLUMN intervention_context TEXT");
        }

        // Migration v13: Add driver_key_fingerprint to comments for audit trail
        if version < 13 {
            let _ = self.conn.execute(
                "ALTER TABLE comments ADD COLUMN driver_key_fingerprint TEXT",
                [],
            );
        }

        // Migration v14: Drop leftover sessions_new table from a bug where
        // user_version was always read as 0 (wrong column name in the query),
        // causing the v7 migration to re-run on every open and leave behind
        // a stale sessions_new table.
        if version < 14 {
            self.migrate("DROP TABLE IF EXISTS sessions_new");
        }

        // Migration v15: Token usage tracking table for web dashboard
        if version < 15 {
            self.migrate_batch(
                r"
                    CREATE TABLE IF NOT EXISTS token_usage (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        agent_id TEXT NOT NULL,
                        session_id INTEGER,
                        timestamp TEXT NOT NULL,
                        input_tokens INTEGER NOT NULL DEFAULT 0,
                        output_tokens INTEGER NOT NULL DEFAULT 0,
                        cache_read_tokens INTEGER,
                        cache_creation_tokens INTEGER,
                        model TEXT NOT NULL DEFAULT 'unknown',
                        cost_estimate REAL,
                        FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE SET NULL
                    );
                    CREATE INDEX IF NOT EXISTS idx_token_usage_agent ON token_usage(agent_id);
                    CREATE INDEX IF NOT EXISTS idx_token_usage_session ON token_usage(session_id);
                    CREATE INDEX IF NOT EXISTS idx_token_usage_timestamp ON token_usage(timestamp);
                    ",
            );
        }

        // Migration v16: Sentinel autonomous maintenance tables
        if version < 16 {
            self.migrate_batch(
                r"
                    CREATE TABLE IF NOT EXISTS sentinel_runs (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        run_id TEXT NOT NULL UNIQUE,
                        started_at TEXT NOT NULL,
                        completed_at TEXT,
                        mode TEXT NOT NULL,
                        signals_found INTEGER DEFAULT 0,
                        dispatched INTEGER DEFAULT 0,
                        collected INTEGER DEFAULT 0,
                        triaged INTEGER DEFAULT 0,
                        skipped INTEGER DEFAULT 0,
                        deferred INTEGER DEFAULT 0
                    );

                    CREATE TABLE IF NOT EXISTS sentinel_dispatches (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        run_id TEXT NOT NULL,
                        signal_ref TEXT NOT NULL,
                        signal_title TEXT NOT NULL,
                        source TEXT NOT NULL,
                        disposition TEXT NOT NULL,
                        agent_id TEXT,
                        crosslink_issue_id INTEGER,
                        gh_issue_number INTEGER,
                        label TEXT NOT NULL,
                        attempt_number INTEGER DEFAULT 1,
                        model_used TEXT,
                        outcome TEXT DEFAULT 'pending',
                        outcome_detail TEXT,
                        created_at TEXT NOT NULL,
                        completed_at TEXT,
                        FOREIGN KEY (crosslink_issue_id) REFERENCES issues(id)
                    );

                    CREATE INDEX IF NOT EXISTS idx_sentinel_dispatches_signal_ref
                        ON sentinel_dispatches(signal_ref);
                    CREATE INDEX IF NOT EXISTS idx_sentinel_dispatches_outcome
                        ON sentinel_dispatches(outcome);
                    CREATE INDEX IF NOT EXISTS idx_sentinel_dispatches_run_id
                        ON sentinel_dispatches(run_id);
                    CREATE INDEX IF NOT EXISTS idx_sentinel_dispatches_gh_label
                        ON sentinel_dispatches(gh_issue_number, label);
                    ",
            );
        }
    }

    /// Get the current schema version (PRAGMA `user_version`).
    ///
    /// # Errors
    /// Returns an error if the pragma query fails.
    pub fn get_schema_version(&self) -> Result<i32> {
        let version: i32 = self
            .conn
            .query_row("PRAGMA user_version", [], |row| row.get(0))?;
        Ok(version)
    }
}