Skip to main content

retro_core/
db.rs

1use crate::errors::CoreError;
2use crate::models::{
3    IngestedSession, KnowledgeEdge, KnowledgeNode, KnowledgeProject,
4    EdgeType, GraphOperation, NodeScope, NodeStatus, NodeType,
5    Pattern, PatternStatus, PatternType, Projection, ProjectionStatus, SuggestedTarget,
6};
7use chrono::{DateTime, Utc};
8pub use rusqlite::Connection;
9use rusqlite::params;
10use rusqlite::OptionalExtension;
11use std::path::Path;
12
13const SCHEMA_VERSION: u32 = 5;
14
15/// Open (or create) the retro database with WAL mode enabled.
16pub fn open_db(path: &Path) -> Result<Connection, CoreError> {
17    let conn = Connection::open(path)?;
18
19    // Enable WAL mode for concurrent access
20    conn.pragma_update(None, "journal_mode", "WAL")?;
21
22    // Run migrations
23    migrate(&conn)?;
24
25    Ok(conn)
26}
27
28/// Initialize schema on an existing connection (for testing with in-memory DBs).
29pub fn init_db(conn: &Connection) -> Result<(), CoreError> {
30    migrate(conn)
31}
32
33fn migrate(conn: &Connection) -> Result<(), CoreError> {
34    let current_version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0))?;
35
36    if current_version < 1 {
37        conn.execute_batch(
38            "
39            CREATE TABLE IF NOT EXISTS patterns (
40                id TEXT PRIMARY KEY,
41                pattern_type TEXT NOT NULL,
42                description TEXT NOT NULL,
43                confidence REAL NOT NULL,
44                times_seen INTEGER NOT NULL DEFAULT 1,
45                first_seen TEXT NOT NULL,
46                last_seen TEXT NOT NULL,
47                last_projected TEXT,
48                status TEXT NOT NULL DEFAULT 'discovered',
49                source_sessions TEXT NOT NULL,
50                related_files TEXT NOT NULL,
51                suggested_content TEXT NOT NULL,
52                suggested_target TEXT NOT NULL,
53                project TEXT,
54                generation_failed INTEGER NOT NULL DEFAULT 0
55            );
56
57            CREATE TABLE IF NOT EXISTS projections (
58                id TEXT PRIMARY KEY,
59                pattern_id TEXT NOT NULL REFERENCES patterns(id),
60                target_type TEXT NOT NULL,
61                target_path TEXT NOT NULL,
62                content TEXT NOT NULL,
63                applied_at TEXT NOT NULL,
64                pr_url TEXT,
65                nudged INTEGER NOT NULL DEFAULT 0
66            );
67
68            CREATE TABLE IF NOT EXISTS analyzed_sessions (
69                session_id TEXT PRIMARY KEY,
70                project TEXT NOT NULL,
71                analyzed_at TEXT NOT NULL
72            );
73
74            CREATE TABLE IF NOT EXISTS ingested_sessions (
75                session_id TEXT PRIMARY KEY,
76                project TEXT NOT NULL,
77                session_path TEXT NOT NULL,
78                file_size INTEGER NOT NULL,
79                file_mtime TEXT NOT NULL,
80                ingested_at TEXT NOT NULL
81            );
82
83            CREATE INDEX IF NOT EXISTS idx_patterns_status ON patterns(status);
84            CREATE INDEX IF NOT EXISTS idx_patterns_type ON patterns(pattern_type);
85            CREATE INDEX IF NOT EXISTS idx_patterns_target ON patterns(suggested_target);
86            CREATE INDEX IF NOT EXISTS idx_patterns_project ON patterns(project);
87            CREATE INDEX IF NOT EXISTS idx_projections_pattern ON projections(pattern_id);
88            ",
89        )?;
90
91        conn.pragma_update(None, "user_version", 1)?;
92    }
93
94    if current_version < 2 {
95        conn.execute_batch(
96            "
97            CREATE TABLE IF NOT EXISTS metadata (
98                key TEXT PRIMARY KEY,
99                value TEXT NOT NULL
100            );
101            ",
102        )?;
103        conn.pragma_update(None, "user_version", 2)?;
104    }
105
106    if current_version < 3 {
107        conn.execute_batch(
108            "ALTER TABLE projections ADD COLUMN status TEXT NOT NULL DEFAULT 'applied';",
109        )?;
110        conn.pragma_update(None, "user_version", 3)?;
111    }
112
113    if current_version < 4 {
114        conn.execute_batch(
115            "
116            CREATE TABLE IF NOT EXISTS nodes (
117                id TEXT PRIMARY KEY,
118                type TEXT NOT NULL,
119                scope TEXT NOT NULL,
120                project_id TEXT,
121                content TEXT NOT NULL,
122                confidence REAL NOT NULL,
123                status TEXT NOT NULL,
124                created_at TEXT NOT NULL,
125                updated_at TEXT NOT NULL,
126                projected_at TEXT,
127                pr_url TEXT
128            );
129
130            CREATE INDEX IF NOT EXISTS idx_nodes_scope_project ON nodes(scope, project_id, status);
131            CREATE INDEX IF NOT EXISTS idx_nodes_type_status ON nodes(type, status);
132
133            CREATE TABLE IF NOT EXISTS edges (
134                source_id TEXT NOT NULL,
135                target_id TEXT NOT NULL,
136                type TEXT NOT NULL,
137                created_at TEXT NOT NULL,
138                PRIMARY KEY (source_id, target_id, type)
139            );
140
141            CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target_id, type);
142            CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source_id, type);
143
144            CREATE TABLE IF NOT EXISTS projects (
145                id TEXT PRIMARY KEY,
146                path TEXT NOT NULL,
147                remote_url TEXT,
148                agent_type TEXT NOT NULL DEFAULT 'claude_code',
149                last_seen TEXT NOT NULL
150            );
151            ",
152        )?;
153
154        // Migrate existing v1 patterns to v2 nodes
155        migrate_patterns_to_nodes(conn)?;
156
157        conn.pragma_update(None, "user_version", 4)?;
158    }
159
160    if current_version < 5 {
161        // For databases upgraded from v4, we need to add the new columns.
162        // For fresh installs, v4 CREATE TABLE already includes them.
163        let has_projected_at: bool = conn
164            .prepare("SELECT projected_at FROM nodes LIMIT 0")
165            .is_ok();
166        if !has_projected_at {
167            conn.execute_batch(
168                "ALTER TABLE nodes ADD COLUMN projected_at TEXT;
169                 ALTER TABLE nodes ADD COLUMN pr_url TEXT;"
170            )?;
171        }
172        conn.pragma_update(None, "user_version", 5)?;
173    }
174
175    if current_version < 6 {
176        // v6: Clean up v1 leftovers now that v2 pipeline is primary.
177        // - Archive all v1 patterns (they've been migrated to nodes in v4)
178        // - Delete pending_review projections (v2 uses nodes table for review)
179        // - Remove bogus project entries with path "/" (from sessions ingested without project context)
180        conn.execute_batch(
181            "UPDATE patterns SET status = 'archived' WHERE status IN ('discovered', 'active');
182             DELETE FROM projections WHERE status = 'pending_review';
183             DELETE FROM projects WHERE path = '/';
184             DELETE FROM ingested_sessions WHERE project = '/';"
185        )?;
186        conn.pragma_update(None, "user_version", 6)?;
187    }
188
189    Ok(())
190}
191
192/// Check if a session has already been ingested and is up-to-date.
193pub fn is_session_ingested(
194    conn: &Connection,
195    session_id: &str,
196    file_size: u64,
197    file_mtime: &str,
198) -> Result<bool, CoreError> {
199    let mut stmt = conn.prepare(
200        "SELECT file_size, file_mtime FROM ingested_sessions WHERE session_id = ?1",
201    )?;
202
203    let result = stmt.query_row(params![session_id], |row| {
204        let size: u64 = row.get(0)?;
205        let mtime: String = row.get(1)?;
206        Ok((size, mtime))
207    });
208
209    match result {
210        Ok((size, mtime)) => Ok(size == file_size && mtime == file_mtime),
211        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
212        Err(e) => Err(CoreError::Database(e.to_string())),
213    }
214}
215
216/// Record a session as ingested.
217pub fn record_ingested_session(
218    conn: &Connection,
219    session: &IngestedSession,
220) -> Result<(), CoreError> {
221    conn.execute(
222        "INSERT OR REPLACE INTO ingested_sessions (session_id, project, session_path, file_size, file_mtime, ingested_at)
223         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
224        params![
225            session.session_id,
226            session.project,
227            session.session_path,
228            session.file_size,
229            session.file_mtime,
230            session.ingested_at.to_rfc3339(),
231        ],
232    )?;
233    Ok(())
234}
235
236/// Get the count of ingested sessions.
237pub fn ingested_session_count(conn: &Connection) -> Result<u64, CoreError> {
238    let count: u64 =
239        conn.query_row("SELECT COUNT(*) FROM ingested_sessions", [], |row| {
240            row.get(0)
241        })?;
242    Ok(count)
243}
244
245/// Get the count of ingested sessions for a specific project.
246pub fn ingested_session_count_for_project(
247    conn: &Connection,
248    project: &str,
249) -> Result<u64, CoreError> {
250    let count: u64 = conn.query_row(
251        "SELECT COUNT(*) FROM ingested_sessions WHERE project = ?1",
252        params![project],
253        |row| row.get(0),
254    )?;
255    Ok(count)
256}
257
258/// Get the count of analyzed sessions.
259pub fn analyzed_session_count(conn: &Connection) -> Result<u64, CoreError> {
260    let count: u64 =
261        conn.query_row("SELECT COUNT(*) FROM analyzed_sessions", [], |row| {
262            row.get(0)
263        })?;
264    Ok(count)
265}
266
267/// Get the count of patterns by status.
268pub fn pattern_count_by_status(conn: &Connection, status: &str) -> Result<u64, CoreError> {
269    let count: u64 = conn.query_row(
270        "SELECT COUNT(*) FROM patterns WHERE status = ?1",
271        params![status],
272        |row| row.get(0),
273    )?;
274    Ok(count)
275}
276
277/// Get the most recent ingestion timestamp.
278pub fn last_ingested_at(conn: &Connection) -> Result<Option<String>, CoreError> {
279    let result = conn.query_row(
280        "SELECT MAX(ingested_at) FROM ingested_sessions",
281        [],
282        |row| row.get::<_, Option<String>>(0),
283    )?;
284    Ok(result)
285}
286
287/// Get the most recent analysis timestamp.
288pub fn last_analyzed_at(conn: &Connection) -> Result<Option<String>, CoreError> {
289    let result = conn.query_row(
290        "SELECT MAX(analyzed_at) FROM analyzed_sessions",
291        [],
292        |row| row.get::<_, Option<String>>(0),
293    )?;
294    Ok(result)
295}
296
297/// Get the most recent projection (apply) timestamp.
298pub fn last_applied_at(conn: &Connection) -> Result<Option<String>, CoreError> {
299    let result = conn.query_row(
300        "SELECT MAX(applied_at) FROM projections",
301        [],
302        |row| row.get::<_, Option<String>>(0),
303    )?;
304    Ok(result)
305}
306
307/// Check if there are ingested sessions that haven't been analyzed yet.
308pub fn has_unanalyzed_sessions(conn: &Connection) -> Result<bool, CoreError> {
309    let count: u64 = conn.query_row(
310        "SELECT COUNT(*) FROM ingested_sessions i
311         LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
312         WHERE a.session_id IS NULL",
313        [],
314        |row| row.get(0),
315    )?;
316    Ok(count > 0)
317}
318
319/// Count ingested sessions that haven't been analyzed yet.
320pub fn unanalyzed_session_count(conn: &Connection) -> Result<u64, CoreError> {
321    let count: u64 = conn.query_row(
322        "SELECT COUNT(*) FROM ingested_sessions i
323         LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
324         WHERE a.session_id IS NULL",
325        [],
326        |row| row.get(0),
327    )?;
328    Ok(count)
329}
330
331/// Check if there are patterns eligible for projection that haven't been projected yet.
332/// Mirrors the gating logic in `get_qualifying_patterns()`: excludes patterns with
333/// generation_failed=true, suggested_target='db_only', or confidence below threshold.
334/// The confidence threshold is the sole quality gate (no times_seen requirement).
335pub fn has_unprojected_patterns(conn: &Connection, confidence_threshold: f64) -> Result<bool, CoreError> {
336    let count: u64 = conn.query_row(
337        "SELECT COUNT(*) FROM patterns p
338         LEFT JOIN projections pr ON p.id = pr.pattern_id
339         WHERE pr.id IS NULL
340         AND p.status IN ('discovered', 'active')
341         AND p.generation_failed = 0
342         AND p.suggested_target != 'db_only'
343         AND p.confidence >= ?1",
344        [confidence_threshold],
345        |row| row.get(0),
346    )?;
347    Ok(count > 0)
348}
349
350/// Get the last nudge timestamp from metadata.
351pub fn get_last_nudge_at(conn: &Connection) -> Result<Option<DateTime<Utc>>, CoreError> {
352    let result: Option<String> = conn
353        .query_row(
354            "SELECT value FROM metadata WHERE key = 'last_nudge_at'",
355            [],
356            |row| row.get(0),
357        )
358        .optional()?;
359
360    match result {
361        Some(s) => match DateTime::parse_from_rfc3339(&s) {
362            Ok(dt) => Ok(Some(dt.with_timezone(&Utc))),
363            Err(_) => Ok(None),
364        },
365        None => Ok(None),
366    }
367}
368
369/// Set the last nudge timestamp in metadata.
370pub fn set_last_nudge_at(conn: &Connection, timestamp: &DateTime<Utc>) -> Result<(), CoreError> {
371    conn.execute(
372        "INSERT OR REPLACE INTO metadata (key, value) VALUES ('last_nudge_at', ?1)",
373        params![timestamp.to_rfc3339()],
374    )?;
375    Ok(())
376}
377
378/// Get a value from the metadata table by key.
379pub fn get_metadata(conn: &Connection, key: &str) -> Result<Option<String>, CoreError> {
380    let result: Option<String> = conn
381        .query_row(
382            "SELECT value FROM metadata WHERE key = ?1",
383            params![key],
384            |row| row.get(0),
385        )
386        .optional()?;
387    Ok(result)
388}
389
390/// Set a value in the metadata table (insert or replace).
391pub fn set_metadata(conn: &Connection, key: &str, value: &str) -> Result<(), CoreError> {
392    conn.execute(
393        "INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)",
394        params![key, value],
395    )?;
396    Ok(())
397}
398
399/// Verify the database is using WAL mode.
400pub fn verify_wal_mode(conn: &Connection) -> Result<bool, CoreError> {
401    let mode: String = conn.pragma_query_value(None, "journal_mode", |row| row.get(0))?;
402    Ok(mode.to_lowercase() == "wal")
403}
404
405/// Get all distinct projects from ingested sessions.
406pub fn list_projects(conn: &Connection) -> Result<Vec<String>, CoreError> {
407    let mut stmt =
408        conn.prepare("SELECT DISTINCT project FROM ingested_sessions ORDER BY project")?;
409    let projects = stmt
410        .query_map([], |row| row.get(0))?
411        .filter_map(|r| r.ok())
412        .collect();
413    Ok(projects)
414}
415
416// ── Pattern operations ──
417
418const PATTERN_COLUMNS: &str = "id, pattern_type, description, confidence, times_seen, first_seen, last_seen, last_projected, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed";
419
420/// Insert a new pattern into the database.
421pub fn insert_pattern(conn: &Connection, pattern: &Pattern) -> Result<(), CoreError> {
422    let source_sessions =
423        serde_json::to_string(&pattern.source_sessions).unwrap_or_else(|_| "[]".to_string());
424    let related_files =
425        serde_json::to_string(&pattern.related_files).unwrap_or_else(|_| "[]".to_string());
426
427    conn.execute(
428        "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, last_projected, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
429         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
430        params![
431            pattern.id,
432            pattern.pattern_type.to_string(),
433            pattern.description,
434            pattern.confidence,
435            pattern.times_seen,
436            pattern.first_seen.to_rfc3339(),
437            pattern.last_seen.to_rfc3339(),
438            pattern.last_projected.map(|t| t.to_rfc3339()),
439            pattern.status.to_string(),
440            source_sessions,
441            related_files,
442            pattern.suggested_content,
443            pattern.suggested_target.to_string(),
444            pattern.project,
445            pattern.generation_failed as i32,
446        ],
447    )?;
448    Ok(())
449}
450
451/// Update an existing pattern with new evidence (merge).
452pub fn update_pattern_merge(
453    conn: &Connection,
454    id: &str,
455    new_sessions: &[String],
456    new_confidence: f64,
457    new_last_seen: DateTime<Utc>,
458    additional_times_seen: i64,
459) -> Result<(), CoreError> {
460    // Load existing source_sessions and merge
461    let existing_sessions: String = conn.query_row(
462        "SELECT source_sessions FROM patterns WHERE id = ?1",
463        params![id],
464        |row| row.get(0),
465    )?;
466
467    let mut sessions: Vec<String> =
468        serde_json::from_str(&existing_sessions).unwrap_or_default();
469    for s in new_sessions {
470        if !sessions.contains(s) {
471            sessions.push(s.clone());
472        }
473    }
474    let merged_sessions = serde_json::to_string(&sessions).unwrap_or_else(|_| "[]".to_string());
475
476    conn.execute(
477        "UPDATE patterns SET
478            confidence = MAX(confidence, ?2),
479            times_seen = times_seen + ?3,
480            last_seen = ?4,
481            source_sessions = ?5
482         WHERE id = ?1",
483        params![
484            id,
485            new_confidence,
486            additional_times_seen,
487            new_last_seen.to_rfc3339(),
488            merged_sessions,
489        ],
490    )?;
491    Ok(())
492}
493
494/// Get patterns filtered by status and optionally by project.
495pub fn get_patterns(
496    conn: &Connection,
497    statuses: &[&str],
498    project: Option<&str>,
499) -> Result<Vec<Pattern>, CoreError> {
500    if statuses.is_empty() {
501        return Ok(Vec::new());
502    }
503
504    let placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 1)).collect();
505    let status_clause = placeholders.join(", ");
506
507    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match project {
508        Some(proj) => {
509            let q = format!(
510                "SELECT {PATTERN_COLUMNS}
511                 FROM patterns WHERE status IN ({}) AND (project = ?{} OR project IS NULL)
512                 ORDER BY confidence DESC",
513                status_clause,
514                statuses.len() + 1
515            );
516            let mut p: Vec<Box<dyn rusqlite::types::ToSql>> = statuses.iter().map(|s| Box::new(s.to_string()) as Box<dyn rusqlite::types::ToSql>).collect();
517            p.push(Box::new(proj.to_string()));
518            (q, p)
519        }
520        None => {
521            let q = format!(
522                "SELECT {PATTERN_COLUMNS}
523                 FROM patterns WHERE status IN ({})
524                 ORDER BY confidence DESC",
525                status_clause
526            );
527            let p: Vec<Box<dyn rusqlite::types::ToSql>> = statuses.iter().map(|s| Box::new(s.to_string()) as Box<dyn rusqlite::types::ToSql>).collect();
528            (q, p)
529        }
530    };
531
532    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
533    let mut stmt = conn.prepare(&query)?;
534    let patterns = stmt
535        .query_map(params_refs.as_slice(), |row| {
536            Ok(read_pattern_row(row))
537        })?
538        .filter_map(|r| r.ok())
539        .collect();
540
541    Ok(patterns)
542}
543
544/// Get all patterns, optionally filtered by project.
545pub fn get_all_patterns(conn: &Connection, project: Option<&str>) -> Result<Vec<Pattern>, CoreError> {
546    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match project {
547        Some(proj) => {
548            let q = format!(
549                "SELECT {PATTERN_COLUMNS}
550                 FROM patterns WHERE project = ?1 OR project IS NULL
551                 ORDER BY confidence DESC"
552            );
553            (q, vec![Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>])
554        }
555        None => {
556            let q = format!(
557                "SELECT {PATTERN_COLUMNS}
558                 FROM patterns ORDER BY confidence DESC"
559            );
560            (q, vec![])
561        }
562    };
563
564    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
565    let mut stmt = conn.prepare(&query)?;
566    let patterns = stmt
567        .query_map(params_refs.as_slice(), |row| Ok(read_pattern_row(row)))?
568        .filter_map(|r| r.ok())
569        .collect();
570
571    Ok(patterns)
572}
573
574fn read_pattern_row(row: &rusqlite::Row<'_>) -> Pattern {
575    let source_sessions_str: String = row.get(9).unwrap_or_default();
576    let related_files_str: String = row.get(10).unwrap_or_default();
577    let first_seen_str: String = row.get(5).unwrap_or_default();
578    let last_seen_str: String = row.get(6).unwrap_or_default();
579    let last_projected_str: Option<String> = row.get(7).unwrap_or(None);
580    let gen_failed: i32 = row.get(14).unwrap_or(0);
581
582    Pattern {
583        id: row.get(0).unwrap_or_default(),
584        pattern_type: PatternType::from_str(&row.get::<_, String>(1).unwrap_or_default()),
585        description: row.get(2).unwrap_or_default(),
586        confidence: row.get(3).unwrap_or(0.0),
587        times_seen: row.get(4).unwrap_or(1),
588        first_seen: DateTime::parse_from_rfc3339(&first_seen_str)
589            .map(|d| d.with_timezone(&Utc))
590            .unwrap_or_else(|_| Utc::now()),
591        last_seen: DateTime::parse_from_rfc3339(&last_seen_str)
592            .map(|d| d.with_timezone(&Utc))
593            .unwrap_or_else(|_| Utc::now()),
594        last_projected: last_projected_str
595            .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
596            .map(|d| d.with_timezone(&Utc)),
597        status: PatternStatus::from_str(&row.get::<_, String>(8).unwrap_or_default()),
598        source_sessions: serde_json::from_str(&source_sessions_str).unwrap_or_default(),
599        related_files: serde_json::from_str(&related_files_str).unwrap_or_default(),
600        suggested_content: row.get(11).unwrap_or_default(),
601        suggested_target: SuggestedTarget::from_str(&row.get::<_, String>(12).unwrap_or_default()),
602        project: row.get(13).unwrap_or(None),
603        generation_failed: gen_failed != 0,
604    }
605}
606
607// ── Analyzed session tracking ──
608
609/// Record a session as analyzed.
610pub fn record_analyzed_session(
611    conn: &Connection,
612    session_id: &str,
613    project: &str,
614) -> Result<(), CoreError> {
615    conn.execute(
616        "INSERT OR REPLACE INTO analyzed_sessions (session_id, project, analyzed_at)
617         VALUES (?1, ?2, ?3)",
618        params![session_id, project, Utc::now().to_rfc3339()],
619    )?;
620    Ok(())
621}
622
623/// Check if a session has been analyzed.
624pub fn is_session_analyzed(conn: &Connection, session_id: &str) -> Result<bool, CoreError> {
625    let count: u64 = conn.query_row(
626        "SELECT COUNT(*) FROM analyzed_sessions WHERE session_id = ?1",
627        params![session_id],
628        |row| row.get(0),
629    )?;
630    Ok(count > 0)
631}
632
633/// Get ingested sessions for analysis within the time window.
634/// When `rolling_window` is true, returns ALL sessions in the window (re-analyzes everything).
635/// When false, only returns sessions not yet in `analyzed_sessions` (analyze-once).
636pub fn get_sessions_for_analysis(
637    conn: &Connection,
638    project: Option<&str>,
639    since: &DateTime<Utc>,
640    rolling_window: bool,
641) -> Result<Vec<IngestedSession>, CoreError> {
642    let since_str = since.to_rfc3339();
643
644    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match (project, rolling_window) {
645        (Some(proj), true) => {
646            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
647                     FROM ingested_sessions i
648                     WHERE i.project = ?1 AND i.ingested_at >= ?2
649                     ORDER BY i.ingested_at".to_string();
650            (q, vec![
651                Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>,
652                Box::new(since_str) as Box<dyn rusqlite::types::ToSql>,
653            ])
654        }
655        (Some(proj), false) => {
656            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
657                     FROM ingested_sessions i
658                     LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
659                     WHERE a.session_id IS NULL AND i.project = ?1 AND i.ingested_at >= ?2
660                     ORDER BY i.ingested_at".to_string();
661            (q, vec![
662                Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>,
663                Box::new(since_str) as Box<dyn rusqlite::types::ToSql>,
664            ])
665        }
666        (None, true) => {
667            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
668                     FROM ingested_sessions i
669                     WHERE i.ingested_at >= ?1
670                     ORDER BY i.ingested_at".to_string();
671            (q, vec![Box::new(since_str) as Box<dyn rusqlite::types::ToSql>])
672        }
673        (None, false) => {
674            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
675                     FROM ingested_sessions i
676                     LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
677                     WHERE a.session_id IS NULL AND i.ingested_at >= ?1
678                     ORDER BY i.ingested_at".to_string();
679            (q, vec![Box::new(since_str) as Box<dyn rusqlite::types::ToSql>])
680        }
681    };
682
683    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
684    let mut stmt = conn.prepare(&query)?;
685    let sessions = stmt
686        .query_map(params_refs.as_slice(), |row| {
687            let ingested_at_str: String = row.get(5)?;
688            let ingested_at = DateTime::parse_from_rfc3339(&ingested_at_str)
689                .map(|d| d.with_timezone(&Utc))
690                .unwrap_or_else(|_| Utc::now());
691            Ok(IngestedSession {
692                session_id: row.get(0)?,
693                project: row.get(1)?,
694                session_path: row.get(2)?,
695                file_size: row.get(3)?,
696                file_mtime: row.get(4)?,
697                ingested_at,
698            })
699        })?
700        .filter_map(|r| r.ok())
701        .collect();
702
703    Ok(sessions)
704}
705
706// ── Projection operations ──
707
708/// Insert a new projection record.
709pub fn insert_projection(conn: &Connection, proj: &Projection) -> Result<(), CoreError> {
710    conn.execute(
711        "INSERT INTO projections (id, pattern_id, target_type, target_path, content, applied_at, pr_url, status)
712         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
713        params![
714            proj.id,
715            proj.pattern_id,
716            proj.target_type,
717            proj.target_path,
718            proj.content,
719            proj.applied_at.to_rfc3339(),
720            proj.pr_url,
721            proj.status.to_string(),
722        ],
723    )?;
724    Ok(())
725}
726
727/// Check if a pattern already has an active projection.
728pub fn has_projection_for_pattern(conn: &Connection, pattern_id: &str) -> Result<bool, CoreError> {
729    let count: u64 = conn.query_row(
730        "SELECT COUNT(*) FROM projections WHERE pattern_id = ?1",
731        params![pattern_id],
732        |row| row.get(0),
733    )?;
734    Ok(count > 0)
735}
736
737/// Get the set of all pattern IDs that already have projections.
738pub fn get_projected_pattern_ids(
739    conn: &Connection,
740) -> Result<std::collections::HashSet<String>, CoreError> {
741    let mut stmt = conn.prepare("SELECT DISTINCT pattern_id FROM projections")?;
742    let ids = stmt
743        .query_map([], |row| row.get(0))?
744        .filter_map(|r| r.ok())
745        .collect();
746    Ok(ids)
747}
748
749/// Update a pattern's status.
750pub fn update_pattern_status(
751    conn: &Connection,
752    id: &str,
753    status: &PatternStatus,
754) -> Result<(), CoreError> {
755    conn.execute(
756        "UPDATE patterns SET status = ?2 WHERE id = ?1",
757        params![id, status.to_string()],
758    )?;
759    Ok(())
760}
761
762/// Set or clear the generation_failed flag on a pattern.
763pub fn set_generation_failed(
764    conn: &Connection,
765    id: &str,
766    failed: bool,
767) -> Result<(), CoreError> {
768    conn.execute(
769        "UPDATE patterns SET generation_failed = ?2 WHERE id = ?1",
770        params![id, failed as i32],
771    )?;
772    Ok(())
773}
774
775/// Get all projections for active patterns (for staleness detection).
776pub fn get_projections_for_active_patterns(
777    conn: &Connection,
778) -> Result<Vec<Projection>, CoreError> {
779    let mut stmt = conn.prepare(
780        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
781         FROM projections p
782         INNER JOIN patterns pat ON p.pattern_id = pat.id
783         WHERE pat.status = 'active'",
784    )?;
785
786    let projections = stmt
787        .query_map([], |row| {
788            let applied_at_str: String = row.get(5)?;
789            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
790                .map(|d| d.with_timezone(&Utc))
791                .unwrap_or_else(|_| Utc::now());
792            let status_str: String = row.get(7)?;
793            let status = ProjectionStatus::from_str(&status_str)
794                .unwrap_or(ProjectionStatus::Applied);
795            Ok(Projection {
796                id: row.get(0)?,
797                pattern_id: row.get(1)?,
798                target_type: row.get(2)?,
799                target_path: row.get(3)?,
800                content: row.get(4)?,
801                applied_at,
802                pr_url: row.get(6)?,
803                status,
804            })
805        })?
806        .filter_map(|r| r.ok())
807        .collect();
808
809    Ok(projections)
810}
811
812/// Update a pattern's last_projected timestamp to now.
813pub fn update_pattern_last_projected(conn: &Connection, id: &str) -> Result<(), CoreError> {
814    conn.execute(
815        "UPDATE patterns SET last_projected = ?2 WHERE id = ?1",
816        params![id, Utc::now().to_rfc3339()],
817    )?;
818    Ok(())
819}
820
821/// Get all projections with pending_review status.
822pub fn get_pending_review_projections(conn: &Connection) -> Result<Vec<Projection>, CoreError> {
823    let mut stmt = conn.prepare(
824        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
825         FROM projections p
826         WHERE p.status = 'pending_review'
827         ORDER BY p.applied_at ASC",
828    )?;
829
830    let projections = stmt
831        .query_map([], |row| {
832            let applied_at_str: String = row.get(5)?;
833            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
834                .map(|d| d.with_timezone(&Utc))
835                .unwrap_or_else(|_| Utc::now());
836            let status_str: String = row.get(7)?;
837            let status = ProjectionStatus::from_str(&status_str)
838                .unwrap_or(ProjectionStatus::PendingReview);
839            Ok(Projection {
840                id: row.get(0)?,
841                pattern_id: row.get(1)?,
842                target_type: row.get(2)?,
843                target_path: row.get(3)?,
844                content: row.get(4)?,
845                applied_at,
846                pr_url: row.get(6)?,
847                status,
848            })
849        })?
850        .filter_map(|r| r.ok())
851        .collect();
852
853    Ok(projections)
854}
855
856/// Update a projection's status.
857pub fn update_projection_status(
858    conn: &Connection,
859    projection_id: &str,
860    status: &ProjectionStatus,
861) -> Result<(), CoreError> {
862    conn.execute(
863        "UPDATE projections SET status = ?2 WHERE id = ?1",
864        params![projection_id, status.to_string()],
865    )?;
866    Ok(())
867}
868
869/// Delete a projection record.
870pub fn delete_projection(conn: &Connection, projection_id: &str) -> Result<(), CoreError> {
871    conn.execute("DELETE FROM projections WHERE id = ?1", params![projection_id])?;
872    Ok(())
873}
874
875/// Get applied projections that have a PR URL (for sync).
876pub fn get_applied_projections_with_pr(conn: &Connection) -> Result<Vec<Projection>, CoreError> {
877    let mut stmt = conn.prepare(
878        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
879         FROM projections p
880         WHERE p.status = 'applied' AND p.pr_url IS NOT NULL",
881    )?;
882
883    let projections = stmt
884        .query_map([], |row| {
885            let applied_at_str: String = row.get(5)?;
886            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
887                .map(|d| d.with_timezone(&Utc))
888                .unwrap_or_else(|_| Utc::now());
889            let status_str: String = row.get(7)?;
890            let status = ProjectionStatus::from_str(&status_str)
891                .unwrap_or(ProjectionStatus::Applied);
892            Ok(Projection {
893                id: row.get(0)?,
894                pattern_id: row.get(1)?,
895                target_type: row.get(2)?,
896                target_path: row.get(3)?,
897                content: row.get(4)?,
898                applied_at,
899                pr_url: row.get(6)?,
900                status,
901            })
902        })?
903        .filter_map(|r| r.ok())
904        .collect();
905
906    Ok(projections)
907}
908
909/// Get pattern IDs that have projections with specific statuses.
910pub fn get_projected_pattern_ids_by_status(
911    conn: &Connection,
912    statuses: &[ProjectionStatus],
913) -> Result<std::collections::HashSet<String>, CoreError> {
914    if statuses.is_empty() {
915        return Ok(std::collections::HashSet::new());
916    }
917    let placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 1)).collect();
918    let sql = format!(
919        "SELECT DISTINCT pattern_id FROM projections WHERE status IN ({})",
920        placeholders.join(", ")
921    );
922    let mut stmt = conn.prepare(&sql)?;
923    let params: Vec<String> = statuses.iter().map(|s| s.to_string()).collect();
924    let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|s| s as &dyn rusqlite::types::ToSql).collect();
925    let ids = stmt
926        .query_map(param_refs.as_slice(), |row| row.get(0))?
927        .filter_map(|r| r.ok())
928        .collect();
929    Ok(ids)
930}
931
932/// Update a projection's PR URL.
933pub fn update_projection_pr_url(
934    conn: &Connection,
935    projection_id: &str,
936    pr_url: &str,
937) -> Result<(), CoreError> {
938    conn.execute(
939        "UPDATE projections SET pr_url = ?2 WHERE id = ?1",
940        params![projection_id, pr_url],
941    )?;
942    Ok(())
943}
944
945// ── Knowledge graph: Migration ──
946
947/// Migrate v1 patterns to v2 knowledge nodes. Returns number of nodes created.
948/// Safe to call multiple times — skips patterns that already have corresponding nodes.
949pub fn migrate_patterns_to_nodes(conn: &Connection) -> Result<usize, CoreError> {
950    let mut stmt = conn.prepare(
951        "SELECT id, pattern_type, description, confidence, status, suggested_content, suggested_target, project, first_seen, last_seen
952         FROM patterns",
953    )?;
954    let patterns: Vec<(String, String, String, f64, String, String, String, Option<String>, String, String)> = stmt.query_map([], |row| {
955        Ok((
956            row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?,
957            row.get(4)?, row.get(5)?, row.get(6)?, row.get(7)?,
958            row.get(8)?, row.get(9)?,
959        ))
960    })?.filter_map(|r| r.ok()).collect();
961
962    let mut count = 0;
963    for (id, pattern_type, description, confidence, _status, suggested_content, suggested_target, project, first_seen, last_seen) in &patterns {
964        let node_id = format!("migrated-{id}");
965
966        // Skip if already migrated
967        if get_node(conn, &node_id)?.is_some() {
968            continue;
969        }
970
971        // Determine node type using the spec's deterministic mapping
972        let content_lower = suggested_content.to_lowercase();
973        let has_directive_keyword = content_lower.contains("always") || content_lower.contains("never");
974        let node_type = if *confidence >= 0.85 && has_directive_keyword {
975            NodeType::Directive
976        } else {
977            match (pattern_type.as_str(), suggested_target.as_str()) {
978                ("repetitive_instruction", "claude_md") => NodeType::Rule,
979                ("repetitive_instruction", "skill") => NodeType::Directive,
980                ("recurring_mistake", _) => NodeType::Pattern,
981                ("workflow_pattern", "skill") => NodeType::Skill,
982                ("workflow_pattern", "claude_md") => NodeType::Rule,
983                ("stale_context", _) => NodeType::Memory,
984                ("redundant_context", _) => NodeType::Memory,
985                _ => NodeType::Pattern,
986            }
987        };
988
989        let created_at = DateTime::parse_from_rfc3339(first_seen)
990            .unwrap_or_default()
991            .with_timezone(&Utc);
992        let updated_at = DateTime::parse_from_rfc3339(last_seen)
993            .unwrap_or_default()
994            .with_timezone(&Utc);
995
996        let node = KnowledgeNode {
997            id: node_id,
998            node_type,
999            scope: NodeScope::Project,
1000            project_id: project.clone(),
1001            content: description.clone(),
1002            confidence: *confidence,
1003            status: NodeStatus::Active,
1004            created_at,
1005            updated_at,
1006            projected_at: None,
1007            pr_url: None,
1008        };
1009        insert_node(conn, &node)?;
1010        count += 1;
1011    }
1012    Ok(count)
1013}
1014
1015// ── Knowledge graph: Node operations ──
1016
1017pub fn insert_node(conn: &Connection, node: &KnowledgeNode) -> Result<(), CoreError> {
1018    conn.execute(
1019        "INSERT INTO nodes (id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url)
1020         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
1021        params![
1022            node.id,
1023            node.node_type.to_string(),
1024            node.scope.to_string(),
1025            node.project_id,
1026            node.content,
1027            node.confidence,
1028            node.status.to_string(),
1029            node.created_at.to_rfc3339(),
1030            node.updated_at.to_rfc3339(),
1031            node.projected_at,
1032            node.pr_url,
1033        ],
1034    )?;
1035    Ok(())
1036}
1037
1038pub fn get_node(conn: &Connection, id: &str) -> Result<Option<KnowledgeNode>, CoreError> {
1039    let mut stmt = conn.prepare(
1040        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1041         FROM nodes WHERE id = ?1",
1042    )?;
1043    let result = stmt.query_row(params![id], |row| {
1044        Ok(KnowledgeNode {
1045            id: row.get(0)?,
1046            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1047            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1048            project_id: row.get(3)?,
1049            content: row.get(4)?,
1050            confidence: row.get(5)?,
1051            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1052            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1053                .unwrap_or_default()
1054                .with_timezone(&Utc),
1055            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1056                .unwrap_or_default()
1057                .with_timezone(&Utc),
1058            projected_at: row.get(9)?,
1059            pr_url: row.get(10)?,
1060        })
1061    }).optional()?;
1062    Ok(result)
1063}
1064
1065pub fn get_nodes_by_scope(
1066    conn: &Connection,
1067    scope: &NodeScope,
1068    project_id: Option<&str>,
1069    statuses: &[NodeStatus],
1070) -> Result<Vec<KnowledgeNode>, CoreError> {
1071    if statuses.is_empty() {
1072        return Ok(Vec::new());
1073    }
1074    let status_placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 3)).collect();
1075    let sql = format!(
1076        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1077         FROM nodes WHERE scope = ?1 AND (?2 IS NULL OR project_id = ?2) AND status IN ({})
1078         ORDER BY confidence DESC",
1079        status_placeholders.join(", ")
1080    );
1081    let mut stmt = conn.prepare(&sql)?;
1082    let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = vec![
1083        Box::new(scope.to_string()),
1084        Box::new(project_id.map(|s| s.to_string())),
1085    ];
1086    for s in statuses {
1087        params_vec.push(Box::new(s.to_string()));
1088    }
1089    let rows = stmt.query_map(rusqlite::params_from_iter(params_vec.iter().map(|p| p.as_ref())), |row| {
1090        Ok(KnowledgeNode {
1091            id: row.get(0)?,
1092            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1093            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1094            project_id: row.get(3)?,
1095            content: row.get(4)?,
1096            confidence: row.get(5)?,
1097            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1098            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1099                .unwrap_or_default()
1100                .with_timezone(&Utc),
1101            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1102                .unwrap_or_default()
1103                .with_timezone(&Utc),
1104            projected_at: row.get(9)?,
1105            pr_url: row.get(10)?,
1106        })
1107    })?;
1108    let mut nodes = Vec::new();
1109    for row in rows {
1110        nodes.push(row?);
1111    }
1112    Ok(nodes)
1113}
1114
1115/// Get all nodes with a given status, ordered by confidence DESC.
1116pub fn get_nodes_by_status(
1117    conn: &Connection,
1118    status: &NodeStatus,
1119) -> Result<Vec<KnowledgeNode>, CoreError> {
1120    let mut stmt = conn.prepare(
1121        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1122         FROM nodes WHERE status = ?1
1123         ORDER BY confidence DESC",
1124    )?;
1125    let rows = stmt.query_map(params![status.to_string()], |row| {
1126        Ok(KnowledgeNode {
1127            id: row.get(0)?,
1128            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1129            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1130            project_id: row.get(3)?,
1131            content: row.get(4)?,
1132            confidence: row.get(5)?,
1133            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1134            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1135                .unwrap_or_default()
1136                .with_timezone(&Utc),
1137            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1138                .unwrap_or_default()
1139                .with_timezone(&Utc),
1140            projected_at: row.get(9)?,
1141            pr_url: row.get(10)?,
1142        })
1143    })?;
1144    let mut nodes = Vec::new();
1145    for row in rows {
1146        nodes.push(row?);
1147    }
1148    Ok(nodes)
1149}
1150
1151pub fn update_node_confidence(conn: &Connection, id: &str, confidence: f64) -> Result<(), CoreError> {
1152    conn.execute(
1153        "UPDATE nodes SET confidence = ?1, updated_at = ?2 WHERE id = ?3",
1154        params![confidence, Utc::now().to_rfc3339(), id],
1155    )?;
1156    Ok(())
1157}
1158
1159pub fn update_node_status(conn: &Connection, id: &str, status: &NodeStatus) -> Result<(), CoreError> {
1160    conn.execute(
1161        "UPDATE nodes SET status = ?1, updated_at = ?2 WHERE id = ?3",
1162        params![status.to_string(), Utc::now().to_rfc3339(), id],
1163    )?;
1164    Ok(())
1165}
1166
1167pub fn update_node_content(conn: &Connection, id: &str, content: &str) -> Result<(), CoreError> {
1168    conn.execute(
1169        "UPDATE nodes SET content = ?1, updated_at = ?2 WHERE id = ?3",
1170        params![content, Utc::now().to_rfc3339(), id],
1171    )?;
1172    Ok(())
1173}
1174
1175// ── Knowledge graph: Edge operations ──
1176
1177pub fn insert_edge(conn: &Connection, edge: &KnowledgeEdge) -> Result<(), CoreError> {
1178    conn.execute(
1179        "INSERT OR IGNORE INTO edges (source_id, target_id, type, created_at)
1180         VALUES (?1, ?2, ?3, ?4)",
1181        params![
1182            edge.source_id,
1183            edge.target_id,
1184            edge.edge_type.to_string(),
1185            edge.created_at.to_rfc3339(),
1186        ],
1187    )?;
1188    Ok(())
1189}
1190
1191pub fn get_edges_from(conn: &Connection, source_id: &str) -> Result<Vec<KnowledgeEdge>, CoreError> {
1192    let mut stmt = conn.prepare(
1193        "SELECT source_id, target_id, type, created_at FROM edges WHERE source_id = ?1",
1194    )?;
1195    let rows = stmt.query_map(params![source_id], |row| {
1196        Ok(KnowledgeEdge {
1197            source_id: row.get(0)?,
1198            target_id: row.get(1)?,
1199            edge_type: EdgeType::from_str(&row.get::<_, String>(2)?).unwrap_or(EdgeType::Supports),
1200            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
1201                .unwrap_or_default()
1202                .with_timezone(&Utc),
1203        })
1204    })?;
1205    let mut edges = Vec::new();
1206    for row in rows {
1207        edges.push(row?);
1208    }
1209    Ok(edges)
1210}
1211
1212pub fn get_edges_to(conn: &Connection, target_id: &str) -> Result<Vec<KnowledgeEdge>, CoreError> {
1213    let mut stmt = conn.prepare(
1214        "SELECT source_id, target_id, type, created_at FROM edges WHERE target_id = ?1",
1215    )?;
1216    let rows = stmt.query_map(params![target_id], |row| {
1217        Ok(KnowledgeEdge {
1218            source_id: row.get(0)?,
1219            target_id: row.get(1)?,
1220            edge_type: EdgeType::from_str(&row.get::<_, String>(2)?).unwrap_or(EdgeType::Supports),
1221            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
1222                .unwrap_or_default()
1223                .with_timezone(&Utc),
1224        })
1225    })?;
1226    let mut edges = Vec::new();
1227    for row in rows {
1228        edges.push(row?);
1229    }
1230    Ok(edges)
1231}
1232
1233pub fn delete_edge(conn: &Connection, source_id: &str, target_id: &str, edge_type: &EdgeType) -> Result<(), CoreError> {
1234    conn.execute(
1235        "DELETE FROM edges WHERE source_id = ?1 AND target_id = ?2 AND type = ?3",
1236        params![source_id, target_id, edge_type.to_string()],
1237    )?;
1238    Ok(())
1239}
1240
1241/// Mark new_id as superseding old_id: archives old node and creates supersedes edge.
1242pub fn supersede_node(conn: &Connection, new_id: &str, old_id: &str) -> Result<(), CoreError> {
1243    update_node_status(conn, old_id, &NodeStatus::Archived)?;
1244    let edge = KnowledgeEdge {
1245        source_id: new_id.to_string(),
1246        target_id: old_id.to_string(),
1247        edge_type: EdgeType::Supersedes,
1248        created_at: Utc::now(),
1249    };
1250    insert_edge(conn, &edge)?;
1251    Ok(())
1252}
1253
1254/// Result of applying a batch of graph operations.
1255#[derive(Debug, Clone, Default)]
1256pub struct ApplyGraphResult {
1257    pub nodes_created: usize,
1258    pub nodes_updated: usize,
1259    pub edges_created: usize,
1260    pub nodes_merged: usize,
1261}
1262
1263/// Apply a batch of graph operations to the database.
1264pub fn apply_graph_operations(conn: &Connection, ops: &[GraphOperation]) -> Result<ApplyGraphResult, CoreError> {
1265    let mut result = ApplyGraphResult::default();
1266
1267    for op in ops {
1268        match op {
1269            GraphOperation::CreateNode { node_type, scope, project_id, content, confidence } => {
1270                let node = KnowledgeNode {
1271                    id: uuid::Uuid::new_v4().to_string(),
1272                    node_type: node_type.clone(),
1273                    scope: scope.clone(),
1274                    project_id: project_id.clone(),
1275                    content: content.clone(),
1276                    confidence: *confidence,
1277                    status: NodeStatus::Active,
1278                    created_at: Utc::now(),
1279                    updated_at: Utc::now(),
1280                    projected_at: None,
1281                    pr_url: None,
1282                };
1283                insert_node(conn, &node)?;
1284                result.nodes_created += 1;
1285            }
1286            GraphOperation::UpdateNode { id, confidence, content } => {
1287                if let Some(conf) = confidence {
1288                    update_node_confidence(conn, id, *conf)?;
1289                }
1290                if let Some(cont) = content {
1291                    update_node_content(conn, id, cont)?;
1292                }
1293                result.nodes_updated += 1;
1294            }
1295            GraphOperation::CreateEdge { source_id, target_id, edge_type } => {
1296                let edge = KnowledgeEdge {
1297                    source_id: source_id.clone(),
1298                    target_id: target_id.clone(),
1299                    edge_type: edge_type.clone(),
1300                    created_at: Utc::now(),
1301                };
1302                insert_edge(conn, &edge)?;
1303                result.edges_created += 1;
1304            }
1305            GraphOperation::MergeNodes { keep_id, remove_id } => {
1306                supersede_node(conn, keep_id, remove_id)?;
1307                result.nodes_merged += 1;
1308            }
1309        }
1310    }
1311
1312    Ok(result)
1313}
1314
1315// ── Knowledge graph: Project operations ──
1316
1317/// Generate a human-readable slug from a repository path.
1318pub fn generate_project_slug(repo_path: &str) -> String {
1319    let name = std::path::Path::new(repo_path)
1320        .file_name()
1321        .and_then(|n| n.to_str())
1322        .unwrap_or("unnamed-project");
1323
1324    let slug: String = name
1325        .to_lowercase()
1326        .chars()
1327        .map(|c| if c.is_alphanumeric() { c } else { '-' })
1328        .collect();
1329    let slug = slug.trim_matches('-').to_string();
1330    if slug.is_empty() {
1331        "unnamed-project".to_string()
1332    } else {
1333        // Collapse consecutive hyphens
1334        let mut result = String::new();
1335        let mut prev_hyphen = false;
1336        for c in slug.chars() {
1337            if c == '-' {
1338                if !prev_hyphen {
1339                    result.push(c);
1340                }
1341                prev_hyphen = true;
1342            } else {
1343                result.push(c);
1344                prev_hyphen = false;
1345            }
1346        }
1347        result
1348    }
1349}
1350
1351/// Generate a unique project slug, appending -2, -3, etc. if needed.
1352pub fn generate_unique_project_slug(conn: &Connection, repo_path: &str) -> Result<String, CoreError> {
1353    let base = generate_project_slug(repo_path);
1354    if get_project(conn, &base)?.is_none() {
1355        return Ok(base);
1356    }
1357    for i in 2..100 {
1358        let candidate = format!("{base}-{i}");
1359        if get_project(conn, &candidate)?.is_none() {
1360            return Ok(candidate);
1361        }
1362    }
1363    Ok(format!("{base}-{}", &uuid::Uuid::new_v4().to_string()[..8]))
1364}
1365
1366pub fn upsert_project(conn: &Connection, project: &KnowledgeProject) -> Result<(), CoreError> {
1367    conn.execute(
1368        "INSERT INTO projects (id, path, remote_url, agent_type, last_seen)
1369         VALUES (?1, ?2, ?3, ?4, ?5)
1370         ON CONFLICT(id) DO UPDATE SET
1371             path = excluded.path,
1372             remote_url = COALESCE(excluded.remote_url, projects.remote_url),
1373             last_seen = excluded.last_seen",
1374        params![
1375            project.id,
1376            project.path,
1377            project.remote_url,
1378            project.agent_type,
1379            project.last_seen.to_rfc3339(),
1380        ],
1381    )?;
1382    Ok(())
1383}
1384
1385pub fn get_project(conn: &Connection, id: &str) -> Result<Option<KnowledgeProject>, CoreError> {
1386    let mut stmt = conn.prepare(
1387        "SELECT id, path, remote_url, agent_type, last_seen FROM projects WHERE id = ?1",
1388    )?;
1389    let result = stmt.query_row(params![id], |row| {
1390        Ok(KnowledgeProject {
1391            id: row.get(0)?,
1392            path: row.get(1)?,
1393            remote_url: row.get(2)?,
1394            agent_type: row.get(3)?,
1395            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1396                .unwrap_or_default()
1397                .with_timezone(&Utc),
1398        })
1399    }).optional()?;
1400    Ok(result)
1401}
1402
1403pub fn get_project_by_remote_url(conn: &Connection, remote_url: &str) -> Result<Option<KnowledgeProject>, CoreError> {
1404    let mut stmt = conn.prepare(
1405        "SELECT id, path, remote_url, agent_type, last_seen FROM projects WHERE remote_url = ?1",
1406    )?;
1407    let result = stmt.query_row(params![remote_url], |row| {
1408        Ok(KnowledgeProject {
1409            id: row.get(0)?,
1410            path: row.get(1)?,
1411            remote_url: row.get(2)?,
1412            agent_type: row.get(3)?,
1413            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1414                .unwrap_or_default()
1415                .with_timezone(&Utc),
1416        })
1417    }).optional()?;
1418    Ok(result)
1419}
1420
1421/// Get active nodes that haven't been projected yet, ordered by confidence DESC.
1422pub fn get_unprojected_nodes(conn: &Connection) -> Result<Vec<KnowledgeNode>, CoreError> {
1423    let mut stmt = conn.prepare(
1424        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1425         FROM nodes WHERE status = 'active' AND projected_at IS NULL
1426         ORDER BY confidence DESC",
1427    )?;
1428    let rows = stmt.query_map([], |row| {
1429        Ok(KnowledgeNode {
1430            id: row.get(0)?,
1431            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1432            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1433            project_id: row.get(3)?,
1434            content: row.get(4)?,
1435            confidence: row.get(5)?,
1436            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1437            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1438                .unwrap_or_default()
1439                .with_timezone(&Utc),
1440            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1441                .unwrap_or_default()
1442                .with_timezone(&Utc),
1443            projected_at: row.get(9)?,
1444            pr_url: row.get(10)?,
1445        })
1446    })?;
1447    let mut nodes = Vec::new();
1448    for row in rows {
1449        nodes.push(row?);
1450    }
1451    Ok(nodes)
1452}
1453
1454/// Mark a node as projected (direct write, no PR).
1455pub fn mark_node_projected(conn: &Connection, id: &str) -> Result<(), CoreError> {
1456    conn.execute(
1457        "UPDATE nodes SET projected_at = ?1 WHERE id = ?2",
1458        params![Utc::now().to_rfc3339(), id],
1459    )?;
1460    Ok(())
1461}
1462
1463/// Mark a node as projected via PR.
1464pub fn mark_node_projected_with_pr(conn: &Connection, id: &str, pr_url: &str) -> Result<(), CoreError> {
1465    conn.execute(
1466        "UPDATE nodes SET projected_at = ?1, pr_url = ?2 WHERE id = ?3",
1467        params![Utc::now().to_rfc3339(), pr_url, id],
1468    )?;
1469    Ok(())
1470}
1471
1472/// Get all nodes with an associated PR URL.
1473pub fn get_nodes_with_pr(conn: &Connection) -> Result<Vec<KnowledgeNode>, CoreError> {
1474    let mut stmt = conn.prepare(
1475        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1476         FROM nodes WHERE pr_url IS NOT NULL",
1477    )?;
1478    let rows = stmt.query_map([], |row| {
1479        Ok(KnowledgeNode {
1480            id: row.get(0)?,
1481            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1482            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1483            project_id: row.get(3)?,
1484            content: row.get(4)?,
1485            confidence: row.get(5)?,
1486            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1487            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1488                .unwrap_or_default()
1489                .with_timezone(&Utc),
1490            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1491                .unwrap_or_default()
1492                .with_timezone(&Utc),
1493            projected_at: row.get(9)?,
1494            pr_url: row.get(10)?,
1495        })
1496    })?;
1497    let mut nodes = Vec::new();
1498    for row in rows {
1499        nodes.push(row?);
1500    }
1501    Ok(nodes)
1502}
1503
1504/// Clear PR URL from nodes after merge.
1505pub fn clear_node_pr(conn: &Connection, pr_url: &str) -> Result<(), CoreError> {
1506    conn.execute(
1507        "UPDATE nodes SET pr_url = NULL WHERE pr_url = ?1",
1508        params![pr_url],
1509    )?;
1510    Ok(())
1511}
1512
1513/// Dismiss all nodes for a closed PR.
1514pub fn dismiss_nodes_for_pr(conn: &Connection, pr_url: &str) -> Result<(), CoreError> {
1515    conn.execute(
1516        "UPDATE nodes SET status = 'dismissed', pr_url = NULL WHERE pr_url = ?1",
1517        params![pr_url],
1518    )?;
1519    Ok(())
1520}
1521
1522pub fn get_all_projects(conn: &Connection) -> Result<Vec<KnowledgeProject>, CoreError> {
1523    let mut stmt = conn.prepare(
1524        "SELECT id, path, remote_url, agent_type, last_seen FROM projects ORDER BY last_seen DESC",
1525    )?;
1526    let rows = stmt.query_map([], |row| {
1527        Ok(KnowledgeProject {
1528            id: row.get(0)?,
1529            path: row.get(1)?,
1530            remote_url: row.get(2)?,
1531            agent_type: row.get(3)?,
1532            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1533                .unwrap_or_default()
1534                .with_timezone(&Utc),
1535        })
1536    })?;
1537    let mut projects = Vec::new();
1538    for row in rows {
1539        projects.push(row?);
1540    }
1541    Ok(projects)
1542}
1543
1544#[cfg(test)]
1545mod tests {
1546    use super::*;
1547    use crate::models::*;
1548
1549    fn test_db() -> Connection {
1550        let conn = Connection::open_in_memory().unwrap();
1551        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
1552        migrate(&conn).unwrap();
1553        conn
1554    }
1555
1556    fn test_pattern(id: &str, description: &str) -> Pattern {
1557        Pattern {
1558            id: id.to_string(),
1559            pattern_type: PatternType::RepetitiveInstruction,
1560            description: description.to_string(),
1561            confidence: 0.85,
1562            times_seen: 1,
1563            first_seen: Utc::now(),
1564            last_seen: Utc::now(),
1565            last_projected: None,
1566            status: PatternStatus::Discovered,
1567            source_sessions: vec!["sess-1".to_string()],
1568            related_files: vec![],
1569            suggested_content: "Always do X".to_string(),
1570            suggested_target: SuggestedTarget::ClaudeMd,
1571            project: Some("/test/project".to_string()),
1572            generation_failed: false,
1573        }
1574    }
1575
1576    #[test]
1577    fn test_insert_and_get_pattern() {
1578        let conn = test_db();
1579        let pattern = test_pattern("pat-1", "Use uv for Python packages");
1580        insert_pattern(&conn, &pattern).unwrap();
1581
1582        let patterns = get_all_patterns(&conn, None).unwrap();
1583        assert_eq!(patterns.len(), 1);
1584        assert_eq!(patterns[0].id, "pat-1");
1585        assert_eq!(patterns[0].description, "Use uv for Python packages");
1586        assert!((patterns[0].confidence - 0.85).abs() < f64::EPSILON);
1587    }
1588
1589    #[test]
1590    fn test_pattern_merge_update() {
1591        let conn = test_db();
1592        let pattern = test_pattern("pat-1", "Use uv for Python packages");
1593        insert_pattern(&conn, &pattern).unwrap();
1594
1595        update_pattern_merge(
1596            &conn,
1597            "pat-1",
1598            &["sess-2".to_string(), "sess-3".to_string()],
1599            0.92,
1600            Utc::now(),
1601            2,
1602        )
1603        .unwrap();
1604
1605        let patterns = get_all_patterns(&conn, None).unwrap();
1606        assert_eq!(patterns[0].times_seen, 3);
1607        assert!((patterns[0].confidence - 0.92).abs() < f64::EPSILON);
1608        assert_eq!(patterns[0].source_sessions.len(), 3);
1609    }
1610
1611    #[test]
1612    fn test_get_patterns_by_status() {
1613        let conn = test_db();
1614        let p1 = test_pattern("pat-1", "Pattern one");
1615        let mut p2 = test_pattern("pat-2", "Pattern two");
1616        p2.status = PatternStatus::Active;
1617        insert_pattern(&conn, &p1).unwrap();
1618        insert_pattern(&conn, &p2).unwrap();
1619
1620        let discovered = get_patterns(&conn, &["discovered"], None).unwrap();
1621        assert_eq!(discovered.len(), 1);
1622        assert_eq!(discovered[0].id, "pat-1");
1623
1624        let active = get_patterns(&conn, &["active"], None).unwrap();
1625        assert_eq!(active.len(), 1);
1626        assert_eq!(active[0].id, "pat-2");
1627
1628        let both = get_patterns(&conn, &["discovered", "active"], None).unwrap();
1629        assert_eq!(both.len(), 2);
1630    }
1631
1632    #[test]
1633    fn test_analyzed_session_tracking() {
1634        let conn = test_db();
1635        assert!(!is_session_analyzed(&conn, "sess-1").unwrap());
1636
1637        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1638        assert!(is_session_analyzed(&conn, "sess-1").unwrap());
1639        assert!(!is_session_analyzed(&conn, "sess-2").unwrap());
1640    }
1641
1642    #[test]
1643    fn test_sessions_for_analysis() {
1644        let conn = test_db();
1645
1646        // Record an ingested session
1647        let session = IngestedSession {
1648            session_id: "sess-1".to_string(),
1649            project: "/test".to_string(),
1650            session_path: "/tmp/test.jsonl".to_string(),
1651            file_size: 100,
1652            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1653            ingested_at: Utc::now(),
1654        };
1655        record_ingested_session(&conn, &session).unwrap();
1656
1657        // It should appear in sessions for analysis (non-rolling)
1658        let since = Utc::now() - chrono::Duration::days(14);
1659        let pending = get_sessions_for_analysis(&conn, None, &since, false).unwrap();
1660        assert_eq!(pending.len(), 1);
1661
1662        // After marking as analyzed, it should not appear in non-rolling mode
1663        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1664        let pending = get_sessions_for_analysis(&conn, None, &since, false).unwrap();
1665        assert_eq!(pending.len(), 0);
1666
1667        // But it SHOULD still appear in rolling window mode
1668        let pending = get_sessions_for_analysis(&conn, None, &since, true).unwrap();
1669        assert_eq!(pending.len(), 1);
1670    }
1671
1672    #[test]
1673    fn test_insert_and_check_projection() {
1674        let conn = test_db();
1675        let pattern = test_pattern("pat-1", "Use uv");
1676        insert_pattern(&conn, &pattern).unwrap();
1677
1678        assert!(!has_projection_for_pattern(&conn, "pat-1").unwrap());
1679
1680        let proj = Projection {
1681            id: "proj-1".to_string(),
1682            pattern_id: "pat-1".to_string(),
1683            target_type: "claude_md".to_string(),
1684            target_path: "/test/CLAUDE.md".to_string(),
1685            content: "Always use uv".to_string(),
1686            applied_at: Utc::now(),
1687            pr_url: None,
1688            status: ProjectionStatus::Applied,
1689        };
1690        insert_projection(&conn, &proj).unwrap();
1691
1692        assert!(has_projection_for_pattern(&conn, "pat-1").unwrap());
1693        assert!(!has_projection_for_pattern(&conn, "pat-2").unwrap());
1694    }
1695
1696    #[test]
1697    fn test_update_pattern_status() {
1698        let conn = test_db();
1699        let pattern = test_pattern("pat-1", "Test pattern");
1700        insert_pattern(&conn, &pattern).unwrap();
1701
1702        update_pattern_status(&conn, "pat-1", &PatternStatus::Active).unwrap();
1703        let patterns = get_patterns(&conn, &["active"], None).unwrap();
1704        assert_eq!(patterns.len(), 1);
1705        assert_eq!(patterns[0].id, "pat-1");
1706    }
1707
1708    #[test]
1709    fn test_set_generation_failed() {
1710        let conn = test_db();
1711        let pattern = test_pattern("pat-1", "Test pattern");
1712        insert_pattern(&conn, &pattern).unwrap();
1713
1714        assert!(!get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1715
1716        set_generation_failed(&conn, "pat-1", true).unwrap();
1717        assert!(get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1718
1719        set_generation_failed(&conn, "pat-1", false).unwrap();
1720        assert!(!get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1721    }
1722
1723    #[test]
1724    fn test_projections_nudged_column_defaults_to_zero() {
1725        let conn = test_db();
1726
1727        // Verify the nudged column exists by preparing a statement that references it
1728        conn.prepare("SELECT nudged FROM projections").unwrap();
1729
1730        // Insert a projection without specifying nudged — should default to 0
1731        let pattern = test_pattern("pat-1", "Test pattern");
1732        insert_pattern(&conn, &pattern).unwrap();
1733
1734        let proj = Projection {
1735            id: "proj-1".to_string(),
1736            pattern_id: "pat-1".to_string(),
1737            target_type: "claude_md".to_string(),
1738            target_path: "/test/CLAUDE.md".to_string(),
1739            content: "Always use uv".to_string(),
1740            applied_at: Utc::now(),
1741            pr_url: None,
1742            status: ProjectionStatus::Applied,
1743        };
1744        insert_projection(&conn, &proj).unwrap();
1745
1746        let nudged: i64 = conn
1747            .query_row(
1748                "SELECT nudged FROM projections WHERE id = 'proj-1'",
1749                [],
1750                |row| row.get(0),
1751            )
1752            .unwrap();
1753        assert_eq!(nudged, 0, "nudged column should default to 0");
1754    }
1755
1756    // ── Tests for auto-apply pipeline DB functions ──
1757
1758    #[test]
1759    fn test_last_applied_at_empty() {
1760        let conn = test_db();
1761        let result = last_applied_at(&conn).unwrap();
1762        assert_eq!(result, None);
1763    }
1764
1765    #[test]
1766    fn test_last_applied_at_returns_max() {
1767        let conn = test_db();
1768
1769        // Insert two patterns to serve as FK targets
1770        let p1 = test_pattern("pat-1", "Pattern one");
1771        let p2 = test_pattern("pat-2", "Pattern two");
1772        insert_pattern(&conn, &p1).unwrap();
1773        insert_pattern(&conn, &p2).unwrap();
1774
1775        // Insert projections with different timestamps
1776        let earlier = chrono::DateTime::parse_from_rfc3339("2026-01-10T00:00:00Z")
1777            .unwrap()
1778            .with_timezone(&Utc);
1779        let later = chrono::DateTime::parse_from_rfc3339("2026-02-15T12:00:00Z")
1780            .unwrap()
1781            .with_timezone(&Utc);
1782
1783        let proj1 = Projection {
1784            id: "proj-1".to_string(),
1785            pattern_id: "pat-1".to_string(),
1786            target_type: "Skill".to_string(),
1787            target_path: "/path/a".to_string(),
1788            content: "content a".to_string(),
1789            applied_at: earlier,
1790            pr_url: None,
1791            status: ProjectionStatus::Applied,
1792        };
1793        let proj2 = Projection {
1794            id: "proj-2".to_string(),
1795            pattern_id: "pat-2".to_string(),
1796            target_type: "Skill".to_string(),
1797            target_path: "/path/b".to_string(),
1798            content: "content b".to_string(),
1799            applied_at: later,
1800            pr_url: None,
1801            status: ProjectionStatus::Applied,
1802        };
1803        insert_projection(&conn, &proj1).unwrap();
1804        insert_projection(&conn, &proj2).unwrap();
1805
1806        let result = last_applied_at(&conn).unwrap();
1807        assert!(result.is_some());
1808        // The max should be the later timestamp
1809        let max_ts = result.unwrap();
1810        assert!(max_ts.contains("2026-02-15"), "Expected later timestamp, got: {}", max_ts);
1811    }
1812
1813    #[test]
1814    fn test_has_unanalyzed_sessions_empty() {
1815        let conn = test_db();
1816        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1817    }
1818
1819    #[test]
1820    fn test_has_unanalyzed_sessions_with_new_session() {
1821        let conn = test_db();
1822
1823        let session = IngestedSession {
1824            session_id: "sess-1".to_string(),
1825            project: "/test".to_string(),
1826            session_path: "/tmp/test.jsonl".to_string(),
1827            file_size: 100,
1828            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1829            ingested_at: Utc::now(),
1830        };
1831        record_ingested_session(&conn, &session).unwrap();
1832
1833        assert!(has_unanalyzed_sessions(&conn).unwrap());
1834    }
1835
1836    #[test]
1837    fn test_has_unanalyzed_sessions_after_analysis() {
1838        let conn = test_db();
1839
1840        let session = IngestedSession {
1841            session_id: "sess-1".to_string(),
1842            project: "/test".to_string(),
1843            session_path: "/tmp/test.jsonl".to_string(),
1844            file_size: 100,
1845            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1846            ingested_at: Utc::now(),
1847        };
1848        record_ingested_session(&conn, &session).unwrap();
1849        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1850
1851        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1852    }
1853
1854    #[test]
1855    fn test_has_unprojected_patterns_empty() {
1856        let conn = test_db();
1857        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1858    }
1859
1860    #[test]
1861    fn test_has_unprojected_patterns_with_discovered() {
1862        let conn = test_db();
1863
1864        let pattern = test_pattern("pat-1", "Use uv for Python");
1865        insert_pattern(&conn, &pattern).unwrap();
1866
1867        assert!(has_unprojected_patterns(&conn, 0.0).unwrap());
1868    }
1869
1870    #[test]
1871    fn test_has_unprojected_patterns_after_projection() {
1872        let conn = test_db();
1873
1874        let pattern = test_pattern("pat-1", "Use uv for Python");
1875        insert_pattern(&conn, &pattern).unwrap();
1876
1877        let proj = Projection {
1878            id: "proj-1".to_string(),
1879            pattern_id: "pat-1".to_string(),
1880            target_type: "Skill".to_string(),
1881            target_path: "/path".to_string(),
1882            content: "content".to_string(),
1883            applied_at: Utc::now(),
1884            pr_url: Some("https://github.com/test/pull/1".to_string()),
1885            status: ProjectionStatus::Applied,
1886        };
1887        insert_projection(&conn, &proj).unwrap();
1888
1889        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1890    }
1891
1892    #[test]
1893    fn test_has_unprojected_patterns_excludes_generation_failed() {
1894        let conn = test_db();
1895
1896        let pattern = test_pattern("pat-1", "Use uv for Python");
1897        insert_pattern(&conn, &pattern).unwrap();
1898        set_generation_failed(&conn, "pat-1", true).unwrap();
1899
1900        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1901    }
1902
1903    #[test]
1904    fn test_has_unprojected_patterns_excludes_dbonly() {
1905        let conn = test_db();
1906
1907        let mut pattern = test_pattern("pat-1", "Internal tracking only");
1908        pattern.suggested_target = SuggestedTarget::DbOnly;
1909        insert_pattern(&conn, &pattern).unwrap();
1910
1911        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1912    }
1913
1914    #[test]
1915    fn test_auto_apply_data_triggers_full_flow() {
1916        let conn = test_db();
1917
1918        // Initially: no data, no triggers
1919        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1920        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1921
1922        // Step 1: Ingest creates sessions → triggers analyze
1923        let session = IngestedSession {
1924            session_id: "sess-1".to_string(),
1925            project: "/proj".to_string(),
1926            session_path: "/path/sess".to_string(),
1927            file_size: 100,
1928            file_mtime: "2025-01-01T00:00:00Z".to_string(),
1929            ingested_at: Utc::now(),
1930        };
1931        record_ingested_session(&conn, &session).unwrap();
1932        assert!(has_unanalyzed_sessions(&conn).unwrap());
1933
1934        // Step 2: After analysis → sessions marked, patterns created → triggers apply
1935        record_analyzed_session(&conn, "sess-1", "/proj").unwrap();
1936        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1937
1938        let p = test_pattern("pat-1", "Always use cargo fmt");
1939        insert_pattern(&conn, &p).unwrap();
1940        assert!(has_unprojected_patterns(&conn, 0.0).unwrap());
1941
1942        // Step 3: After apply → projection created with PR URL
1943        let proj = Projection {
1944            id: "proj-1".to_string(),
1945            pattern_id: "pat-1".to_string(),
1946            target_type: "Skill".to_string(),
1947            target_path: "/skills/cargo-fmt.md".to_string(),
1948            content: "skill content".to_string(),
1949            applied_at: Utc::now(),
1950            pr_url: Some("https://github.com/test/pull/42".to_string()),
1951            status: ProjectionStatus::Applied,
1952        };
1953        insert_projection(&conn, &proj).unwrap();
1954        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1955    }
1956
1957    #[test]
1958    fn test_get_last_nudge_at_empty() {
1959        let conn = test_db();
1960        assert!(get_last_nudge_at(&conn).unwrap().is_none());
1961    }
1962
1963    #[test]
1964    fn test_unanalyzed_session_count() {
1965        let conn = test_db();
1966        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 0);
1967
1968        // Add 3 sessions
1969        for i in 1..=3 {
1970            let session = IngestedSession {
1971                session_id: format!("sess-{i}"),
1972                project: "/proj".to_string(),
1973                session_path: format!("/path/sess-{i}"),
1974                file_size: 100,
1975                file_mtime: "2025-01-01T00:00:00Z".to_string(),
1976                ingested_at: Utc::now(),
1977            };
1978            record_ingested_session(&conn, &session).unwrap();
1979        }
1980        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 3);
1981
1982        // Analyze one
1983        record_analyzed_session(&conn, "sess-1", "/proj").unwrap();
1984        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 2);
1985    }
1986
1987    #[test]
1988    fn test_set_and_get_last_nudge_at() {
1989        let conn = test_db();
1990        let now = Utc::now();
1991        set_last_nudge_at(&conn, &now).unwrap();
1992        let result = get_last_nudge_at(&conn).unwrap().unwrap();
1993        // Compare to second precision (DB stores RFC 3339)
1994        assert_eq!(
1995            result.format("%Y-%m-%dT%H:%M:%S").to_string(),
1996            now.format("%Y-%m-%dT%H:%M:%S").to_string()
1997        );
1998    }
1999
2000    #[test]
2001    fn test_projection_status_column_exists() {
2002        let conn = test_db();
2003        let pattern = test_pattern("pat-1", "Test");
2004        insert_pattern(&conn, &pattern).unwrap();
2005
2006        let proj = Projection {
2007            id: "proj-1".to_string(),
2008            pattern_id: "pat-1".to_string(),
2009            target_type: "skill".to_string(),
2010            target_path: "/test/skill.md".to_string(),
2011            content: "content".to_string(),
2012            applied_at: Utc::now(),
2013            pr_url: None,
2014            status: ProjectionStatus::PendingReview,
2015        };
2016        insert_projection(&conn, &proj).unwrap();
2017
2018        let status: String = conn
2019            .query_row(
2020                "SELECT status FROM projections WHERE id = 'proj-1'",
2021                [],
2022                |row| row.get(0),
2023            )
2024            .unwrap();
2025        assert_eq!(status, "pending_review");
2026    }
2027
2028    #[test]
2029    fn test_existing_projections_default_to_applied() {
2030        // Simulate a v2 database with existing projections
2031        let conn = Connection::open_in_memory().unwrap();
2032        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2033
2034        // Create v1 schema manually
2035        conn.execute_batch(
2036            "CREATE TABLE patterns (
2037                id TEXT PRIMARY KEY, pattern_type TEXT NOT NULL, description TEXT NOT NULL,
2038                confidence REAL NOT NULL, times_seen INTEGER NOT NULL DEFAULT 1,
2039                first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, last_projected TEXT,
2040                status TEXT NOT NULL DEFAULT 'discovered', source_sessions TEXT NOT NULL,
2041                related_files TEXT NOT NULL, suggested_content TEXT NOT NULL,
2042                suggested_target TEXT NOT NULL, project TEXT,
2043                generation_failed INTEGER NOT NULL DEFAULT 0
2044            );
2045            CREATE TABLE projections (
2046                id TEXT PRIMARY KEY, pattern_id TEXT NOT NULL REFERENCES patterns(id),
2047                target_type TEXT NOT NULL, target_path TEXT NOT NULL, content TEXT NOT NULL,
2048                applied_at TEXT NOT NULL, pr_url TEXT, nudged INTEGER NOT NULL DEFAULT 0
2049            );
2050            CREATE TABLE analyzed_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, analyzed_at TEXT NOT NULL);
2051            CREATE TABLE ingested_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, session_path TEXT NOT NULL, file_size INTEGER NOT NULL, file_mtime TEXT NOT NULL, ingested_at TEXT NOT NULL);
2052            PRAGMA user_version = 1;",
2053        ).unwrap();
2054
2055        // Insert a pattern first (FK target)
2056        conn.execute(
2057            "INSERT INTO patterns (id, pattern_type, description, confidence, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target)
2058             VALUES ('pat-1', 'workflow_pattern', 'Test', 0.8, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z', 'discovered', '[]', '[]', 'content', 'skill')",
2059            [],
2060        ).unwrap();
2061
2062        // Insert an old-style projection (no status column)
2063        conn.execute(
2064            "INSERT INTO projections (id, pattern_id, target_type, target_path, content, applied_at)
2065             VALUES ('proj-old', 'pat-1', 'skill', '/path', 'content', '2026-01-01T00:00:00Z')",
2066            [],
2067        ).unwrap();
2068
2069        // Now run migration (open_db equivalent)
2070        migrate(&conn).unwrap();
2071
2072        // Old projection should have status = 'applied'
2073        let status: String = conn
2074            .query_row("SELECT status FROM projections WHERE id = 'proj-old'", [], |row| row.get(0))
2075            .unwrap();
2076        assert_eq!(status, "applied");
2077    }
2078
2079    #[test]
2080    fn test_get_pending_review_projections() {
2081        let conn = test_db();
2082        let p1 = test_pattern("pat-1", "Pattern one");
2083        let p2 = test_pattern("pat-2", "Pattern two");
2084        insert_pattern(&conn, &p1).unwrap();
2085        insert_pattern(&conn, &p2).unwrap();
2086
2087        // One pending, one applied
2088        let proj1 = Projection {
2089            id: "proj-1".to_string(),
2090            pattern_id: "pat-1".to_string(),
2091            target_type: "skill".to_string(),
2092            target_path: "/test/a.md".to_string(),
2093            content: "content a".to_string(),
2094            applied_at: Utc::now(),
2095            pr_url: None,
2096            status: ProjectionStatus::PendingReview,
2097        };
2098        let proj2 = Projection {
2099            id: "proj-2".to_string(),
2100            pattern_id: "pat-2".to_string(),
2101            target_type: "skill".to_string(),
2102            target_path: "/test/b.md".to_string(),
2103            content: "content b".to_string(),
2104            applied_at: Utc::now(),
2105            pr_url: None,
2106            status: ProjectionStatus::Applied,
2107        };
2108        insert_projection(&conn, &proj1).unwrap();
2109        insert_projection(&conn, &proj2).unwrap();
2110
2111        let pending = get_pending_review_projections(&conn).unwrap();
2112        assert_eq!(pending.len(), 1);
2113        assert_eq!(pending[0].id, "proj-1");
2114    }
2115
2116    #[test]
2117    fn test_update_projection_status() {
2118        let conn = test_db();
2119        let p = test_pattern("pat-1", "Pattern");
2120        insert_pattern(&conn, &p).unwrap();
2121
2122        let proj = Projection {
2123            id: "proj-1".to_string(),
2124            pattern_id: "pat-1".to_string(),
2125            target_type: "skill".to_string(),
2126            target_path: "/test.md".to_string(),
2127            content: "content".to_string(),
2128            applied_at: Utc::now(),
2129            pr_url: None,
2130            status: ProjectionStatus::PendingReview,
2131        };
2132        insert_projection(&conn, &proj).unwrap();
2133
2134        update_projection_status(&conn, "proj-1", &ProjectionStatus::Applied).unwrap();
2135
2136        let status: String = conn
2137            .query_row("SELECT status FROM projections WHERE id = 'proj-1'", [], |row| row.get(0))
2138            .unwrap();
2139        assert_eq!(status, "applied");
2140    }
2141
2142    #[test]
2143    fn test_delete_projection() {
2144        let conn = test_db();
2145        let p = test_pattern("pat-1", "Pattern");
2146        insert_pattern(&conn, &p).unwrap();
2147
2148        let proj = Projection {
2149            id: "proj-1".to_string(),
2150            pattern_id: "pat-1".to_string(),
2151            target_type: "skill".to_string(),
2152            target_path: "/test.md".to_string(),
2153            content: "content".to_string(),
2154            applied_at: Utc::now(),
2155            pr_url: None,
2156            status: ProjectionStatus::PendingReview,
2157        };
2158        insert_projection(&conn, &proj).unwrap();
2159        assert!(has_projection_for_pattern(&conn, "pat-1").unwrap());
2160
2161        delete_projection(&conn, "proj-1").unwrap();
2162        assert!(!has_projection_for_pattern(&conn, "pat-1").unwrap());
2163    }
2164
2165    #[test]
2166    fn test_get_projections_with_pr_url() {
2167        let conn = test_db();
2168        let p1 = test_pattern("pat-1", "Pattern one");
2169        let p2 = test_pattern("pat-2", "Pattern two");
2170        insert_pattern(&conn, &p1).unwrap();
2171        insert_pattern(&conn, &p2).unwrap();
2172
2173        let proj1 = Projection {
2174            id: "proj-1".to_string(),
2175            pattern_id: "pat-1".to_string(),
2176            target_type: "skill".to_string(),
2177            target_path: "/a.md".to_string(),
2178            content: "a".to_string(),
2179            applied_at: Utc::now(),
2180            pr_url: Some("https://github.com/test/pull/1".to_string()),
2181            status: ProjectionStatus::Applied,
2182        };
2183        let proj2 = Projection {
2184            id: "proj-2".to_string(),
2185            pattern_id: "pat-2".to_string(),
2186            target_type: "skill".to_string(),
2187            target_path: "/b.md".to_string(),
2188            content: "b".to_string(),
2189            applied_at: Utc::now(),
2190            pr_url: None,
2191            status: ProjectionStatus::Applied,
2192        };
2193        insert_projection(&conn, &proj1).unwrap();
2194        insert_projection(&conn, &proj2).unwrap();
2195
2196        let with_pr = get_applied_projections_with_pr(&conn).unwrap();
2197        assert_eq!(with_pr.len(), 1);
2198        assert_eq!(with_pr[0].pr_url, Some("https://github.com/test/pull/1".to_string()));
2199    }
2200
2201    #[test]
2202    fn test_get_projected_pattern_ids_by_status() {
2203        let conn = test_db();
2204        let p1 = test_pattern("pat-1", "Pattern one");
2205        let p2 = test_pattern("pat-2", "Pattern two");
2206        insert_pattern(&conn, &p1).unwrap();
2207        insert_pattern(&conn, &p2).unwrap();
2208
2209        let proj1 = Projection {
2210            id: "proj-1".to_string(),
2211            pattern_id: "pat-1".to_string(),
2212            target_type: "skill".to_string(),
2213            target_path: "/a.md".to_string(),
2214            content: "a".to_string(),
2215            applied_at: Utc::now(),
2216            pr_url: None,
2217            status: ProjectionStatus::Applied,
2218        };
2219        let proj2 = Projection {
2220            id: "proj-2".to_string(),
2221            pattern_id: "pat-2".to_string(),
2222            target_type: "skill".to_string(),
2223            target_path: "/b.md".to_string(),
2224            content: "b".to_string(),
2225            applied_at: Utc::now(),
2226            pr_url: None,
2227            status: ProjectionStatus::PendingReview,
2228        };
2229        insert_projection(&conn, &proj1).unwrap();
2230        insert_projection(&conn, &proj2).unwrap();
2231
2232        let ids = get_projected_pattern_ids_by_status(&conn, &[ProjectionStatus::Applied, ProjectionStatus::PendingReview]).unwrap();
2233        assert_eq!(ids.len(), 2);
2234
2235        let ids_applied_only = get_projected_pattern_ids_by_status(&conn, &[ProjectionStatus::Applied]).unwrap();
2236        assert_eq!(ids_applied_only.len(), 1);
2237        assert!(ids_applied_only.contains("pat-1"));
2238    }
2239
2240    #[test]
2241    fn test_has_unprojected_patterns_excludes_dismissed() {
2242        let conn = test_db();
2243
2244        let mut pattern = test_pattern("pat-1", "Dismissed pattern");
2245        pattern.status = PatternStatus::Dismissed;
2246        insert_pattern(&conn, &pattern).unwrap();
2247
2248        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
2249    }
2250
2251    #[test]
2252    fn test_has_unprojected_patterns_excludes_pending_review() {
2253        let conn = test_db();
2254
2255        let pattern = test_pattern("pat-1", "Pattern with pending review");
2256        insert_pattern(&conn, &pattern).unwrap();
2257
2258        // Create a pending_review projection
2259        let proj = Projection {
2260            id: "proj-1".to_string(),
2261            pattern_id: "pat-1".to_string(),
2262            target_type: "skill".to_string(),
2263            target_path: "/test.md".to_string(),
2264            content: "content".to_string(),
2265            applied_at: Utc::now(),
2266            pr_url: None,
2267            status: ProjectionStatus::PendingReview,
2268        };
2269        insert_projection(&conn, &proj).unwrap();
2270
2271        // Pattern already has a pending_review projection — should NOT be "unprojected"
2272        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
2273    }
2274
2275    #[test]
2276    fn test_v4_migration_creates_tables() {
2277        let conn = Connection::open_in_memory().unwrap();
2278        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2279        migrate(&conn).unwrap();
2280
2281        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2282        assert_eq!(version, 6);
2283
2284        // Verify nodes table exists with correct columns
2285        let count: i64 = conn.query_row(
2286            "SELECT COUNT(*) FROM nodes WHERE 1=0", [], |row| row.get(0)
2287        ).unwrap();
2288        assert_eq!(count, 0);
2289
2290        // Verify edges table exists
2291        let count: i64 = conn.query_row(
2292            "SELECT COUNT(*) FROM edges WHERE 1=0", [], |row| row.get(0)
2293        ).unwrap();
2294        assert_eq!(count, 0);
2295
2296        // Verify projects table exists
2297        let count: i64 = conn.query_row(
2298            "SELECT COUNT(*) FROM projects WHERE 1=0", [], |row| row.get(0)
2299        ).unwrap();
2300        assert_eq!(count, 0);
2301    }
2302
2303    // ── Node CRUD tests ──
2304
2305    #[test]
2306    fn test_insert_and_get_node() {
2307        let conn = Connection::open_in_memory().unwrap();
2308        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2309        migrate(&conn).unwrap();
2310
2311        let node = KnowledgeNode {
2312            id: "node-1".to_string(),
2313            node_type: NodeType::Rule,
2314            scope: NodeScope::Project,
2315            project_id: Some("my-app".to_string()),
2316            content: "Always run tests".to_string(),
2317            confidence: 0.85,
2318            status: NodeStatus::Active,
2319            created_at: Utc::now(),
2320            updated_at: Utc::now(),
2321            projected_at: None,
2322            pr_url: None,
2323        };
2324
2325        insert_node(&conn, &node).unwrap();
2326        let retrieved = get_node(&conn, "node-1").unwrap().unwrap();
2327        assert_eq!(retrieved.content, "Always run tests");
2328        assert_eq!(retrieved.node_type, NodeType::Rule);
2329        assert_eq!(retrieved.scope, NodeScope::Project);
2330        assert_eq!(retrieved.confidence, 0.85);
2331    }
2332
2333    #[test]
2334    fn test_get_nodes_by_scope_and_status() {
2335        let conn = Connection::open_in_memory().unwrap();
2336        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2337        migrate(&conn).unwrap();
2338
2339        let now = Utc::now();
2340        for (i, scope) in [NodeScope::Global, NodeScope::Project, NodeScope::Global].iter().enumerate() {
2341            let node = KnowledgeNode {
2342                id: format!("node-{i}"),
2343                node_type: NodeType::Rule,
2344                scope: scope.clone(),
2345                project_id: if *scope == NodeScope::Project { Some("my-app".to_string()) } else { None },
2346                content: format!("Rule {i}"),
2347                confidence: 0.8,
2348                status: NodeStatus::Active,
2349                created_at: now,
2350                updated_at: now,
2351                projected_at: None,
2352                pr_url: None,
2353            };
2354            insert_node(&conn, &node).unwrap();
2355        }
2356
2357        let global_nodes = get_nodes_by_scope(&conn, &NodeScope::Global, None, &[NodeStatus::Active]).unwrap();
2358        assert_eq!(global_nodes.len(), 2);
2359
2360        let project_nodes = get_nodes_by_scope(&conn, &NodeScope::Project, Some("my-app"), &[NodeStatus::Active]).unwrap();
2361        assert_eq!(project_nodes.len(), 1);
2362    }
2363
2364    #[test]
2365    fn test_update_node_confidence() {
2366        let conn = Connection::open_in_memory().unwrap();
2367        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2368        migrate(&conn).unwrap();
2369
2370        let node = KnowledgeNode {
2371            id: "node-1".to_string(),
2372            node_type: NodeType::Pattern,
2373            scope: NodeScope::Project,
2374            project_id: Some("my-app".to_string()),
2375            content: "Forgets tests".to_string(),
2376            confidence: 0.5,
2377            status: NodeStatus::Active,
2378            created_at: Utc::now(),
2379            updated_at: Utc::now(),
2380            projected_at: None,
2381            pr_url: None,
2382        };
2383        insert_node(&conn, &node).unwrap();
2384
2385        update_node_confidence(&conn, "node-1", 0.75).unwrap();
2386        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2387        assert_eq!(updated.confidence, 0.75);
2388    }
2389
2390    #[test]
2391    fn test_update_node_status() {
2392        let conn = Connection::open_in_memory().unwrap();
2393        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2394        migrate(&conn).unwrap();
2395
2396        let node = KnowledgeNode {
2397            id: "node-1".to_string(),
2398            node_type: NodeType::Rule,
2399            scope: NodeScope::Global,
2400            project_id: None,
2401            content: "Use snake_case".to_string(),
2402            confidence: 0.9,
2403            status: NodeStatus::PendingReview,
2404            created_at: Utc::now(),
2405            updated_at: Utc::now(),
2406            projected_at: None,
2407            pr_url: None,
2408        };
2409        insert_node(&conn, &node).unwrap();
2410
2411        update_node_status(&conn, "node-1", &NodeStatus::Active).unwrap();
2412        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2413        assert_eq!(updated.status, NodeStatus::Active);
2414    }
2415
2416    #[test]
2417    fn test_v4_migration_from_v3() {
2418        let conn = Connection::open_in_memory().unwrap();
2419        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2420
2421        // Manually create a v3-state database (v1+v2+v3 tables, user_version=3)
2422        conn.execute_batch("
2423            CREATE TABLE patterns (id TEXT PRIMARY KEY, pattern_type TEXT NOT NULL, description TEXT NOT NULL, confidence REAL NOT NULL, times_seen INTEGER NOT NULL DEFAULT 1, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, last_projected TEXT, status TEXT NOT NULL DEFAULT 'discovered', source_sessions TEXT NOT NULL, related_files TEXT NOT NULL, suggested_content TEXT NOT NULL, suggested_target TEXT NOT NULL, project TEXT, generation_failed INTEGER NOT NULL DEFAULT 0);
2424            CREATE TABLE projections (id TEXT PRIMARY KEY, pattern_id TEXT NOT NULL, target_type TEXT NOT NULL, target_path TEXT NOT NULL, content TEXT NOT NULL, applied_at TEXT NOT NULL, pr_url TEXT, nudged INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'applied');
2425            CREATE TABLE analyzed_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, analyzed_at TEXT NOT NULL);
2426            CREATE TABLE ingested_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, session_path TEXT NOT NULL, file_size INTEGER NOT NULL, file_mtime TEXT NOT NULL, ingested_at TEXT NOT NULL);
2427            CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT NOT NULL);
2428        ").unwrap();
2429        conn.pragma_update(None, "user_version", 3).unwrap();
2430
2431        // Now run migrate — should only add v4 tables
2432        migrate(&conn).unwrap();
2433
2434        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2435        assert_eq!(version, 6);
2436
2437        // v4 tables exist
2438        conn.query_row("SELECT COUNT(*) FROM nodes WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2439        conn.query_row("SELECT COUNT(*) FROM edges WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2440        conn.query_row("SELECT COUNT(*) FROM projects WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2441
2442        // Old tables still exist
2443        conn.query_row("SELECT COUNT(*) FROM patterns WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2444    }
2445
2446    // ── Edge CRUD tests ──
2447
2448    #[test]
2449    fn test_insert_and_get_edges() {
2450        let conn = Connection::open_in_memory().unwrap();
2451        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2452        migrate(&conn).unwrap();
2453
2454        let now = Utc::now();
2455        let node1 = KnowledgeNode {
2456            id: "node-1".to_string(), node_type: NodeType::Pattern,
2457            scope: NodeScope::Project, project_id: Some("app".to_string()),
2458            content: "Pattern A".to_string(), confidence: 0.5,
2459            status: NodeStatus::Active, created_at: now, updated_at: now,
2460            projected_at: None, pr_url: None,
2461        };
2462        let node2 = KnowledgeNode {
2463            id: "node-2".to_string(), node_type: NodeType::Rule,
2464            scope: NodeScope::Project, project_id: Some("app".to_string()),
2465            content: "Rule B".to_string(), confidence: 0.8,
2466            status: NodeStatus::Active, created_at: now, updated_at: now,
2467            projected_at: None, pr_url: None,
2468        };
2469        insert_node(&conn, &node1).unwrap();
2470        insert_node(&conn, &node2).unwrap();
2471
2472        let edge = KnowledgeEdge {
2473            source_id: "node-1".to_string(),
2474            target_id: "node-2".to_string(),
2475            edge_type: EdgeType::DerivedFrom,
2476            created_at: now,
2477        };
2478        insert_edge(&conn, &edge).unwrap();
2479
2480        let edges = get_edges_from(&conn, "node-1").unwrap();
2481        assert_eq!(edges.len(), 1);
2482        assert_eq!(edges[0].target_id, "node-2");
2483        assert_eq!(edges[0].edge_type, EdgeType::DerivedFrom);
2484
2485        let edges_to = get_edges_to(&conn, "node-2").unwrap();
2486        assert_eq!(edges_to.len(), 1);
2487        assert_eq!(edges_to[0].source_id, "node-1");
2488    }
2489
2490    #[test]
2491    fn test_supersede_node_archives_old() {
2492        let conn = Connection::open_in_memory().unwrap();
2493        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2494        migrate(&conn).unwrap();
2495
2496        let now = Utc::now();
2497        let old_node = KnowledgeNode {
2498            id: "old".to_string(), node_type: NodeType::Rule,
2499            scope: NodeScope::Global, project_id: None,
2500            content: "Old rule".to_string(), confidence: 0.8,
2501            status: NodeStatus::Active, created_at: now, updated_at: now,
2502            projected_at: None, pr_url: None,
2503        };
2504        let new_node = KnowledgeNode {
2505            id: "new".to_string(), node_type: NodeType::Rule,
2506            scope: NodeScope::Global, project_id: None,
2507            content: "New rule".to_string(), confidence: 0.85,
2508            status: NodeStatus::Active, created_at: now, updated_at: now,
2509            projected_at: None, pr_url: None,
2510        };
2511        insert_node(&conn, &old_node).unwrap();
2512        insert_node(&conn, &new_node).unwrap();
2513
2514        supersede_node(&conn, "new", "old").unwrap();
2515
2516        let old = get_node(&conn, "old").unwrap().unwrap();
2517        assert_eq!(old.status, NodeStatus::Archived);
2518
2519        let edges = get_edges_from(&conn, "new").unwrap();
2520        assert_eq!(edges.len(), 1);
2521        assert_eq!(edges[0].edge_type, EdgeType::Supersedes);
2522        assert_eq!(edges[0].target_id, "old");
2523    }
2524
2525    // ── Project CRUD tests ──
2526
2527    #[test]
2528    fn test_upsert_and_get_project() {
2529        let conn = Connection::open_in_memory().unwrap();
2530        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2531        migrate(&conn).unwrap();
2532
2533        let project = KnowledgeProject {
2534            id: "my-app".to_string(),
2535            path: "/home/user/my-app".to_string(),
2536            remote_url: Some("git@github.com:user/my-app.git".to_string()),
2537            agent_type: "claude_code".to_string(),
2538            last_seen: Utc::now(),
2539        };
2540        upsert_project(&conn, &project).unwrap();
2541
2542        let retrieved = get_project(&conn, "my-app").unwrap().unwrap();
2543        assert_eq!(retrieved.path, "/home/user/my-app");
2544        assert_eq!(retrieved.remote_url.unwrap(), "git@github.com:user/my-app.git");
2545    }
2546
2547    #[test]
2548    fn test_get_project_by_remote_url() {
2549        let conn = Connection::open_in_memory().unwrap();
2550        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2551        migrate(&conn).unwrap();
2552
2553        let project = KnowledgeProject {
2554            id: "my-app".to_string(),
2555            path: "/old/path".to_string(),
2556            remote_url: Some("git@github.com:user/my-app.git".to_string()),
2557            agent_type: "claude_code".to_string(),
2558            last_seen: Utc::now(),
2559        };
2560        upsert_project(&conn, &project).unwrap();
2561
2562        let found = get_project_by_remote_url(&conn, "git@github.com:user/my-app.git").unwrap();
2563        assert!(found.is_some());
2564        assert_eq!(found.unwrap().id, "my-app");
2565    }
2566
2567    #[test]
2568    fn test_get_all_projects() {
2569        let conn = Connection::open_in_memory().unwrap();
2570        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2571        migrate(&conn).unwrap();
2572
2573        for name in ["app-1", "app-2"] {
2574            let project = KnowledgeProject {
2575                id: name.to_string(),
2576                path: format!("/home/{name}"),
2577                remote_url: None,
2578                agent_type: "claude_code".to_string(),
2579                last_seen: Utc::now(),
2580            };
2581            upsert_project(&conn, &project).unwrap();
2582        }
2583
2584        let projects = get_all_projects(&conn).unwrap();
2585        assert_eq!(projects.len(), 2);
2586    }
2587
2588    #[test]
2589    fn test_generate_project_slug() {
2590        assert_eq!(generate_project_slug("/home/user/my-rust-app"), "my-rust-app");
2591        assert_eq!(generate_project_slug("/home/user/My App"), "my-app");
2592        assert_eq!(generate_project_slug("/"), "unnamed-project");
2593    }
2594
2595    #[test]
2596    fn test_migrate_patterns_to_nodes() {
2597        let conn = Connection::open_in_memory().unwrap();
2598        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2599        migrate(&conn).unwrap();
2600
2601        // Insert v1 patterns
2602        let now = Utc::now().to_rfc3339();
2603        conn.execute(
2604            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2605             VALUES (?1, ?2, ?3, ?4, 2, ?5, ?5, 'active', '[]', '[]', 'content', ?6, ?7, 0)",
2606            params!["p1", "repetitive_instruction", "Always run tests", 0.85, &now, "claude_md", "my-app"],
2607        ).unwrap();
2608        conn.execute(
2609            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2610             VALUES (?1, ?2, ?3, ?4, 1, ?5, ?5, 'discovered', '[]', '[]', 'content', ?6, ?7, 0)",
2611            params!["p2", "recurring_mistake", "Forgets imports", 0.6, &now, "skill", "my-app"],
2612        ).unwrap();
2613        conn.execute(
2614            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2615             VALUES (?1, ?2, ?3, ?4, 3, ?5, ?5, 'active', '[]', '[]', 'Always use snake_case', ?6, ?7, 0)",
2616            params!["p3", "repetitive_instruction", "Always use snake_case", 0.9, &now, "claude_md", "my-app"],
2617        ).unwrap();
2618
2619        let count = migrate_patterns_to_nodes(&conn).unwrap();
2620        assert_eq!(count, 3);
2621
2622        // p1: RepetitiveInstruction + ClaudeMd -> rule
2623        let node1 = get_node(&conn, "migrated-p1").unwrap().unwrap();
2624        assert_eq!(node1.node_type, NodeType::Rule);
2625        assert_eq!(node1.scope, NodeScope::Project);
2626
2627        // p2: RecurringMistake -> pattern
2628        let node2 = get_node(&conn, "migrated-p2").unwrap().unwrap();
2629        assert_eq!(node2.node_type, NodeType::Pattern);
2630
2631        // p3: confidence >= 0.85 + "always" in content -> directive (override)
2632        let node3 = get_node(&conn, "migrated-p3").unwrap().unwrap();
2633        assert_eq!(node3.node_type, NodeType::Directive);
2634    }
2635
2636    #[test]
2637    fn test_apply_graph_operations() {
2638        let conn = Connection::open_in_memory().unwrap();
2639        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2640        migrate(&conn).unwrap();
2641
2642        let ops = vec![
2643            GraphOperation::CreateNode {
2644                node_type: NodeType::Rule,
2645                scope: NodeScope::Project,
2646                project_id: Some("my-app".to_string()),
2647                content: "Always run tests".to_string(),
2648                confidence: 0.85,
2649            },
2650            GraphOperation::CreateNode {
2651                node_type: NodeType::Pattern,
2652                scope: NodeScope::Global,
2653                project_id: None,
2654                content: "Prefers TDD".to_string(),
2655                confidence: 0.6,
2656            },
2657        ];
2658
2659        let result = apply_graph_operations(&conn, &ops).unwrap();
2660        assert_eq!(result.nodes_created, 2);
2661
2662        let nodes = get_nodes_by_scope(&conn, &NodeScope::Project, Some("my-app"), &[NodeStatus::Active]).unwrap();
2663        assert_eq!(nodes.len(), 1);
2664        assert_eq!(nodes[0].content, "Always run tests");
2665    }
2666
2667    #[test]
2668    fn test_apply_graph_operations_update() {
2669        let conn = Connection::open_in_memory().unwrap();
2670        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2671        migrate(&conn).unwrap();
2672
2673        let node = KnowledgeNode {
2674            id: "node-1".to_string(),
2675            node_type: NodeType::Pattern,
2676            scope: NodeScope::Project,
2677            project_id: Some("app".to_string()),
2678            content: "Old content".to_string(),
2679            confidence: 0.5,
2680            status: NodeStatus::Active,
2681            created_at: Utc::now(),
2682            updated_at: Utc::now(),
2683            projected_at: None,
2684            pr_url: None,
2685        };
2686        insert_node(&conn, &node).unwrap();
2687
2688        let ops = vec![
2689            GraphOperation::UpdateNode {
2690                id: "node-1".to_string(),
2691                confidence: Some(0.8),
2692                content: Some("Updated content".to_string()),
2693            },
2694        ];
2695
2696        let result = apply_graph_operations(&conn, &ops).unwrap();
2697        assert_eq!(result.nodes_updated, 1);
2698
2699        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2700        assert_eq!(updated.confidence, 0.8);
2701        assert_eq!(updated.content, "Updated content");
2702    }
2703
2704    #[test]
2705    fn test_get_nodes_by_status() {
2706        let conn = test_db();
2707
2708        let active_node = KnowledgeNode {
2709            id: "n1".to_string(),
2710            node_type: NodeType::Rule,
2711            scope: NodeScope::Project,
2712            project_id: Some("my-app".to_string()),
2713            content: "Always run tests".to_string(),
2714            confidence: 0.85,
2715            status: NodeStatus::Active,
2716            created_at: Utc::now(),
2717            updated_at: Utc::now(),
2718            projected_at: None,
2719            pr_url: None,
2720        };
2721        let pending_node = KnowledgeNode {
2722            id: "n2".to_string(),
2723            node_type: NodeType::Directive,
2724            scope: NodeScope::Global,
2725            project_id: None,
2726            content: "Use snake_case".to_string(),
2727            confidence: 0.9,
2728            status: NodeStatus::PendingReview,
2729            created_at: Utc::now(),
2730            updated_at: Utc::now(),
2731            projected_at: None,
2732            pr_url: None,
2733        };
2734        let dismissed_node = KnowledgeNode {
2735            id: "n3".to_string(),
2736            node_type: NodeType::Pattern,
2737            scope: NodeScope::Project,
2738            project_id: Some("my-app".to_string()),
2739            content: "Old pattern".to_string(),
2740            confidence: 0.5,
2741            status: NodeStatus::Dismissed,
2742            created_at: Utc::now(),
2743            updated_at: Utc::now(),
2744            projected_at: None,
2745            pr_url: None,
2746        };
2747
2748        insert_node(&conn, &active_node).unwrap();
2749        insert_node(&conn, &pending_node).unwrap();
2750        insert_node(&conn, &dismissed_node).unwrap();
2751
2752        let pending = get_nodes_by_status(&conn, &NodeStatus::PendingReview).unwrap();
2753        assert_eq!(pending.len(), 1);
2754        assert_eq!(pending[0].id, "n2");
2755
2756        let active = get_nodes_by_status(&conn, &NodeStatus::Active).unwrap();
2757        assert_eq!(active.len(), 1);
2758        assert_eq!(active[0].id, "n1");
2759
2760        let pending2 = KnowledgeNode {
2761            id: "n4".to_string(),
2762            node_type: NodeType::Rule,
2763            scope: NodeScope::Project,
2764            project_id: Some("other".to_string()),
2765            content: "Second pending".to_string(),
2766            confidence: 0.95,
2767            status: NodeStatus::PendingReview,
2768            created_at: Utc::now(),
2769            updated_at: Utc::now(),
2770            projected_at: None,
2771            pr_url: None,
2772        };
2773        insert_node(&conn, &pending2).unwrap();
2774        let pending_all = get_nodes_by_status(&conn, &NodeStatus::PendingReview).unwrap();
2775        assert_eq!(pending_all.len(), 2);
2776        assert_eq!(pending_all[0].id, "n4"); // Higher confidence first
2777    }
2778
2779    #[test]
2780    fn test_migrate_v4_to_v5_adds_projection_columns() {
2781        let conn = Connection::open_in_memory().unwrap();
2782        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2783        migrate(&conn).unwrap();
2784
2785        // Insert a node — should support new columns
2786        let node = KnowledgeNode {
2787            id: "test-1".to_string(),
2788            node_type: NodeType::Rule,
2789            scope: NodeScope::Global,
2790            project_id: None,
2791            content: "test rule".to_string(),
2792            confidence: 0.8,
2793            status: NodeStatus::Active,
2794            created_at: Utc::now(),
2795            updated_at: Utc::now(),
2796            projected_at: None,
2797            pr_url: None,
2798        };
2799        insert_node(&conn, &node).unwrap();
2800
2801        let retrieved = get_node(&conn, "test-1").unwrap().unwrap();
2802        assert!(retrieved.projected_at.is_none());
2803        assert!(retrieved.pr_url.is_none());
2804
2805        // Verify schema version
2806        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2807        assert_eq!(version, 6);
2808    }
2809
2810    fn test_node(id: &str, status: NodeStatus, projected_at: Option<String>, pr_url: Option<String>) -> KnowledgeNode {
2811        KnowledgeNode {
2812            id: id.to_string(),
2813            node_type: NodeType::Rule,
2814            scope: NodeScope::Global,
2815            project_id: None,
2816            content: format!("Content for {}", id),
2817            confidence: 0.8,
2818            status,
2819            created_at: Utc::now(),
2820            updated_at: Utc::now(),
2821            projected_at,
2822            pr_url,
2823        }
2824    }
2825
2826    #[test]
2827    fn test_get_unprojected_nodes() {
2828        let conn = test_db();
2829
2830        // Active node with no projected_at — should be returned
2831        let active_unprojected = test_node("n1", NodeStatus::Active, None, None);
2832        // Active node with projected_at set — should NOT be returned
2833        let active_projected = test_node("n2", NodeStatus::Active, Some(Utc::now().to_rfc3339()), None);
2834        // PendingReview node with no projected_at — should NOT be returned (wrong status)
2835        let pending = test_node("n3", NodeStatus::PendingReview, None, None);
2836
2837        insert_node(&conn, &active_unprojected).unwrap();
2838        insert_node(&conn, &active_projected).unwrap();
2839        insert_node(&conn, &pending).unwrap();
2840
2841        let nodes = get_unprojected_nodes(&conn).unwrap();
2842        assert_eq!(nodes.len(), 1);
2843        assert_eq!(nodes[0].id, "n1");
2844    }
2845
2846    #[test]
2847    fn test_mark_node_projected() {
2848        let conn = test_db();
2849
2850        let node = test_node("n1", NodeStatus::Active, None, None);
2851        insert_node(&conn, &node).unwrap();
2852
2853        mark_node_projected(&conn, "n1").unwrap();
2854
2855        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2856        assert!(retrieved.projected_at.is_some());
2857        assert!(retrieved.pr_url.is_none());
2858    }
2859
2860    #[test]
2861    fn test_mark_node_projected_with_pr() {
2862        let conn = test_db();
2863
2864        let node = test_node("n1", NodeStatus::Active, None, None);
2865        insert_node(&conn, &node).unwrap();
2866
2867        mark_node_projected_with_pr(&conn, "n1", "https://github.com/test/pull/42").unwrap();
2868
2869        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2870        assert!(retrieved.projected_at.is_some());
2871        assert_eq!(retrieved.pr_url, Some("https://github.com/test/pull/42".to_string()));
2872    }
2873
2874    #[test]
2875    fn test_dismiss_nodes_for_pr() {
2876        let conn = test_db();
2877
2878        let pr_url = "https://github.com/test/pull/99";
2879        let node1 = test_node("n1", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2880        let node2 = test_node("n2", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2881
2882        insert_node(&conn, &node1).unwrap();
2883        insert_node(&conn, &node2).unwrap();
2884
2885        dismiss_nodes_for_pr(&conn, pr_url).unwrap();
2886
2887        let n1 = get_node(&conn, "n1").unwrap().unwrap();
2888        let n2 = get_node(&conn, "n2").unwrap().unwrap();
2889
2890        assert_eq!(n1.status, NodeStatus::Dismissed);
2891        assert!(n1.pr_url.is_none());
2892        assert_eq!(n2.status, NodeStatus::Dismissed);
2893        assert!(n2.pr_url.is_none());
2894    }
2895
2896    #[test]
2897    fn test_clear_node_pr() {
2898        let conn = test_db();
2899
2900        let pr_url = "https://github.com/test/pull/7";
2901        let node = test_node("n1", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2902        insert_node(&conn, &node).unwrap();
2903
2904        clear_node_pr(&conn, pr_url).unwrap();
2905
2906        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2907        assert!(retrieved.pr_url.is_none());
2908        assert_eq!(retrieved.status, NodeStatus::Active);
2909    }
2910}