Skip to main content

retro_core/
db.rs

1use crate::errors::CoreError;
2use crate::models::{
3    IngestedSession, KnowledgeEdge, KnowledgeNode, KnowledgeProject,
4    EdgeType, GraphOperation, NodeScope, NodeStatus, NodeType,
5    Pattern, PatternStatus, PatternType, Projection, ProjectionStatus, SuggestedTarget,
6};
7use chrono::{DateTime, Utc};
8pub use rusqlite::Connection;
9use rusqlite::params;
10use rusqlite::OptionalExtension;
11use std::path::Path;
12
13const SCHEMA_VERSION: u32 = 5;
14
15/// Open (or create) the retro database with WAL mode enabled.
16pub fn open_db(path: &Path) -> Result<Connection, CoreError> {
17    let conn = Connection::open(path)?;
18
19    // Enable WAL mode for concurrent access
20    conn.pragma_update(None, "journal_mode", "WAL")?;
21
22    // Run migrations
23    migrate(&conn)?;
24
25    Ok(conn)
26}
27
28/// Initialize schema on an existing connection (for testing with in-memory DBs).
29pub fn init_db(conn: &Connection) -> Result<(), CoreError> {
30    migrate(conn)
31}
32
33fn migrate(conn: &Connection) -> Result<(), CoreError> {
34    let current_version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0))?;
35
36    if current_version < 1 {
37        conn.execute_batch(
38            "
39            CREATE TABLE IF NOT EXISTS patterns (
40                id TEXT PRIMARY KEY,
41                pattern_type TEXT NOT NULL,
42                description TEXT NOT NULL,
43                confidence REAL NOT NULL,
44                times_seen INTEGER NOT NULL DEFAULT 1,
45                first_seen TEXT NOT NULL,
46                last_seen TEXT NOT NULL,
47                last_projected TEXT,
48                status TEXT NOT NULL DEFAULT 'discovered',
49                source_sessions TEXT NOT NULL,
50                related_files TEXT NOT NULL,
51                suggested_content TEXT NOT NULL,
52                suggested_target TEXT NOT NULL,
53                project TEXT,
54                generation_failed INTEGER NOT NULL DEFAULT 0
55            );
56
57            CREATE TABLE IF NOT EXISTS projections (
58                id TEXT PRIMARY KEY,
59                pattern_id TEXT NOT NULL REFERENCES patterns(id),
60                target_type TEXT NOT NULL,
61                target_path TEXT NOT NULL,
62                content TEXT NOT NULL,
63                applied_at TEXT NOT NULL,
64                pr_url TEXT,
65                nudged INTEGER NOT NULL DEFAULT 0
66            );
67
68            CREATE TABLE IF NOT EXISTS analyzed_sessions (
69                session_id TEXT PRIMARY KEY,
70                project TEXT NOT NULL,
71                analyzed_at TEXT NOT NULL
72            );
73
74            CREATE TABLE IF NOT EXISTS ingested_sessions (
75                session_id TEXT PRIMARY KEY,
76                project TEXT NOT NULL,
77                session_path TEXT NOT NULL,
78                file_size INTEGER NOT NULL,
79                file_mtime TEXT NOT NULL,
80                ingested_at TEXT NOT NULL
81            );
82
83            CREATE INDEX IF NOT EXISTS idx_patterns_status ON patterns(status);
84            CREATE INDEX IF NOT EXISTS idx_patterns_type ON patterns(pattern_type);
85            CREATE INDEX IF NOT EXISTS idx_patterns_target ON patterns(suggested_target);
86            CREATE INDEX IF NOT EXISTS idx_patterns_project ON patterns(project);
87            CREATE INDEX IF NOT EXISTS idx_projections_pattern ON projections(pattern_id);
88            ",
89        )?;
90
91        conn.pragma_update(None, "user_version", 1)?;
92    }
93
94    if current_version < 2 {
95        conn.execute_batch(
96            "
97            CREATE TABLE IF NOT EXISTS metadata (
98                key TEXT PRIMARY KEY,
99                value TEXT NOT NULL
100            );
101            ",
102        )?;
103        conn.pragma_update(None, "user_version", 2)?;
104    }
105
106    if current_version < 3 {
107        conn.execute_batch(
108            "ALTER TABLE projections ADD COLUMN status TEXT NOT NULL DEFAULT 'applied';",
109        )?;
110        conn.pragma_update(None, "user_version", 3)?;
111    }
112
113    if current_version < 4 {
114        conn.execute_batch(
115            "
116            CREATE TABLE IF NOT EXISTS nodes (
117                id TEXT PRIMARY KEY,
118                type TEXT NOT NULL,
119                scope TEXT NOT NULL,
120                project_id TEXT,
121                content TEXT NOT NULL,
122                confidence REAL NOT NULL,
123                status TEXT NOT NULL,
124                created_at TEXT NOT NULL,
125                updated_at TEXT NOT NULL,
126                projected_at TEXT,
127                pr_url TEXT
128            );
129
130            CREATE INDEX IF NOT EXISTS idx_nodes_scope_project ON nodes(scope, project_id, status);
131            CREATE INDEX IF NOT EXISTS idx_nodes_type_status ON nodes(type, status);
132
133            CREATE TABLE IF NOT EXISTS edges (
134                source_id TEXT NOT NULL,
135                target_id TEXT NOT NULL,
136                type TEXT NOT NULL,
137                created_at TEXT NOT NULL,
138                PRIMARY KEY (source_id, target_id, type)
139            );
140
141            CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target_id, type);
142            CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source_id, type);
143
144            CREATE TABLE IF NOT EXISTS projects (
145                id TEXT PRIMARY KEY,
146                path TEXT NOT NULL,
147                remote_url TEXT,
148                agent_type TEXT NOT NULL DEFAULT 'claude_code',
149                last_seen TEXT NOT NULL
150            );
151            ",
152        )?;
153
154        // Migrate existing v1 patterns to v2 nodes
155        migrate_patterns_to_nodes(conn)?;
156
157        conn.pragma_update(None, "user_version", 4)?;
158    }
159
160    if current_version < 5 {
161        // For databases upgraded from v4, we need to add the new columns.
162        // For fresh installs, v4 CREATE TABLE already includes them.
163        let has_projected_at: bool = conn
164            .prepare("SELECT projected_at FROM nodes LIMIT 0")
165            .is_ok();
166        if !has_projected_at {
167            conn.execute_batch(
168                "ALTER TABLE nodes ADD COLUMN projected_at TEXT;
169                 ALTER TABLE nodes ADD COLUMN pr_url TEXT;"
170            )?;
171        }
172        conn.pragma_update(None, "user_version", 5)?;
173    }
174
175    if current_version < 6 {
176        // v6: Clean up v1 leftovers now that v2 pipeline is primary.
177        // - Archive all v1 patterns (they've been migrated to nodes in v4)
178        // - Delete pending_review projections (v2 uses nodes table for review)
179        // - Remove bogus project entries with path "/" (from sessions ingested without project context)
180        conn.execute_batch(
181            "UPDATE patterns SET status = 'archived' WHERE status IN ('discovered', 'active');
182             DELETE FROM projections WHERE status = 'pending_review';
183             DELETE FROM projects WHERE path = '/';
184             DELETE FROM ingested_sessions WHERE project = '/';"
185        )?;
186        conn.pragma_update(None, "user_version", 6)?;
187    }
188
189    Ok(())
190}
191
192/// Check if a session has already been ingested and is up-to-date.
193pub fn is_session_ingested(
194    conn: &Connection,
195    session_id: &str,
196    file_size: u64,
197    file_mtime: &str,
198) -> Result<bool, CoreError> {
199    let mut stmt = conn.prepare(
200        "SELECT file_size, file_mtime FROM ingested_sessions WHERE session_id = ?1",
201    )?;
202
203    let result = stmt.query_row(params![session_id], |row| {
204        let size: u64 = row.get(0)?;
205        let mtime: String = row.get(1)?;
206        Ok((size, mtime))
207    });
208
209    match result {
210        Ok((size, mtime)) => Ok(size == file_size && mtime == file_mtime),
211        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
212        Err(e) => Err(CoreError::Database(e.to_string())),
213    }
214}
215
216/// Record a session as ingested.
217pub fn record_ingested_session(
218    conn: &Connection,
219    session: &IngestedSession,
220) -> Result<(), CoreError> {
221    conn.execute(
222        "INSERT OR REPLACE INTO ingested_sessions (session_id, project, session_path, file_size, file_mtime, ingested_at)
223         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
224        params![
225            session.session_id,
226            session.project,
227            session.session_path,
228            session.file_size,
229            session.file_mtime,
230            session.ingested_at.to_rfc3339(),
231        ],
232    )?;
233    Ok(())
234}
235
236/// Get the count of ingested sessions.
237pub fn ingested_session_count(conn: &Connection) -> Result<u64, CoreError> {
238    let count: u64 =
239        conn.query_row("SELECT COUNT(*) FROM ingested_sessions", [], |row| {
240            row.get(0)
241        })?;
242    Ok(count)
243}
244
245/// Get the count of ingested sessions for a specific project.
246pub fn ingested_session_count_for_project(
247    conn: &Connection,
248    project: &str,
249) -> Result<u64, CoreError> {
250    let count: u64 = conn.query_row(
251        "SELECT COUNT(*) FROM ingested_sessions WHERE project = ?1",
252        params![project],
253        |row| row.get(0),
254    )?;
255    Ok(count)
256}
257
258/// Get the count of analyzed sessions.
259pub fn analyzed_session_count(conn: &Connection) -> Result<u64, CoreError> {
260    let count: u64 =
261        conn.query_row("SELECT COUNT(*) FROM analyzed_sessions", [], |row| {
262            row.get(0)
263        })?;
264    Ok(count)
265}
266
267/// Get the count of patterns by status.
268pub fn pattern_count_by_status(conn: &Connection, status: &str) -> Result<u64, CoreError> {
269    let count: u64 = conn.query_row(
270        "SELECT COUNT(*) FROM patterns WHERE status = ?1",
271        params![status],
272        |row| row.get(0),
273    )?;
274    Ok(count)
275}
276
277/// Get the most recent ingestion timestamp.
278pub fn last_ingested_at(conn: &Connection) -> Result<Option<String>, CoreError> {
279    let result = conn.query_row(
280        "SELECT MAX(ingested_at) FROM ingested_sessions",
281        [],
282        |row| row.get::<_, Option<String>>(0),
283    )?;
284    Ok(result)
285}
286
287/// Get the most recent analysis timestamp.
288pub fn last_analyzed_at(conn: &Connection) -> Result<Option<String>, CoreError> {
289    let result = conn.query_row(
290        "SELECT MAX(analyzed_at) FROM analyzed_sessions",
291        [],
292        |row| row.get::<_, Option<String>>(0),
293    )?;
294    Ok(result)
295}
296
297/// Get the most recent projection (apply) timestamp.
298pub fn last_applied_at(conn: &Connection) -> Result<Option<String>, CoreError> {
299    let result = conn.query_row(
300        "SELECT MAX(applied_at) FROM projections",
301        [],
302        |row| row.get::<_, Option<String>>(0),
303    )?;
304    Ok(result)
305}
306
307/// Check if there are ingested sessions that haven't been analyzed yet.
308pub fn has_unanalyzed_sessions(conn: &Connection) -> Result<bool, CoreError> {
309    let count: u64 = conn.query_row(
310        "SELECT COUNT(*) FROM ingested_sessions i
311         LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
312         WHERE a.session_id IS NULL",
313        [],
314        |row| row.get(0),
315    )?;
316    Ok(count > 0)
317}
318
319/// Count ingested sessions that haven't been analyzed yet.
320pub fn unanalyzed_session_count(conn: &Connection) -> Result<u64, CoreError> {
321    let count: u64 = conn.query_row(
322        "SELECT COUNT(*) FROM ingested_sessions i
323         LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
324         WHERE a.session_id IS NULL",
325        [],
326        |row| row.get(0),
327    )?;
328    Ok(count)
329}
330
331/// Check if there are patterns eligible for projection that haven't been projected yet.
332/// Mirrors the gating logic in `get_qualifying_patterns()`: excludes patterns with
333/// generation_failed=true, suggested_target='db_only', or confidence below threshold.
334/// The confidence threshold is the sole quality gate (no times_seen requirement).
335pub fn has_unprojected_patterns(conn: &Connection, confidence_threshold: f64) -> Result<bool, CoreError> {
336    let count: u64 = conn.query_row(
337        "SELECT COUNT(*) FROM patterns p
338         LEFT JOIN projections pr ON p.id = pr.pattern_id
339         WHERE pr.id IS NULL
340         AND p.status IN ('discovered', 'active')
341         AND p.generation_failed = 0
342         AND p.suggested_target != 'db_only'
343         AND p.confidence >= ?1",
344        [confidence_threshold],
345        |row| row.get(0),
346    )?;
347    Ok(count > 0)
348}
349
350/// Get the last nudge timestamp from metadata.
351pub fn get_last_nudge_at(conn: &Connection) -> Result<Option<DateTime<Utc>>, CoreError> {
352    let result: Option<String> = conn
353        .query_row(
354            "SELECT value FROM metadata WHERE key = 'last_nudge_at'",
355            [],
356            |row| row.get(0),
357        )
358        .optional()?;
359
360    match result {
361        Some(s) => match DateTime::parse_from_rfc3339(&s) {
362            Ok(dt) => Ok(Some(dt.with_timezone(&Utc))),
363            Err(_) => Ok(None),
364        },
365        None => Ok(None),
366    }
367}
368
369/// Set the last nudge timestamp in metadata.
370pub fn set_last_nudge_at(conn: &Connection, timestamp: &DateTime<Utc>) -> Result<(), CoreError> {
371    conn.execute(
372        "INSERT OR REPLACE INTO metadata (key, value) VALUES ('last_nudge_at', ?1)",
373        params![timestamp.to_rfc3339()],
374    )?;
375    Ok(())
376}
377
378/// Get a value from the metadata table by key.
379pub fn get_metadata(conn: &Connection, key: &str) -> Result<Option<String>, CoreError> {
380    let result: Option<String> = conn
381        .query_row(
382            "SELECT value FROM metadata WHERE key = ?1",
383            params![key],
384            |row| row.get(0),
385        )
386        .optional()?;
387    Ok(result)
388}
389
390/// Set a value in the metadata table (insert or replace).
391pub fn set_metadata(conn: &Connection, key: &str, value: &str) -> Result<(), CoreError> {
392    conn.execute(
393        "INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)",
394        params![key, value],
395    )?;
396    Ok(())
397}
398
399/// Verify the database is using WAL mode.
400pub fn verify_wal_mode(conn: &Connection) -> Result<bool, CoreError> {
401    let mode: String = conn.pragma_query_value(None, "journal_mode", |row| row.get(0))?;
402    Ok(mode.to_lowercase() == "wal")
403}
404
405/// Get all distinct projects from ingested sessions.
406pub fn list_projects(conn: &Connection) -> Result<Vec<String>, CoreError> {
407    let mut stmt =
408        conn.prepare("SELECT DISTINCT project FROM ingested_sessions ORDER BY project")?;
409    let projects = stmt
410        .query_map([], |row| row.get(0))?
411        .filter_map(|r| r.ok())
412        .collect();
413    Ok(projects)
414}
415
416// ── Pattern operations ──
417
418const PATTERN_COLUMNS: &str = "id, pattern_type, description, confidence, times_seen, first_seen, last_seen, last_projected, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed";
419
420/// Insert a new pattern into the database.
421pub fn insert_pattern(conn: &Connection, pattern: &Pattern) -> Result<(), CoreError> {
422    let source_sessions =
423        serde_json::to_string(&pattern.source_sessions).unwrap_or_else(|_| "[]".to_string());
424    let related_files =
425        serde_json::to_string(&pattern.related_files).unwrap_or_else(|_| "[]".to_string());
426
427    conn.execute(
428        "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, last_projected, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
429         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
430        params![
431            pattern.id,
432            pattern.pattern_type.to_string(),
433            pattern.description,
434            pattern.confidence,
435            pattern.times_seen,
436            pattern.first_seen.to_rfc3339(),
437            pattern.last_seen.to_rfc3339(),
438            pattern.last_projected.map(|t| t.to_rfc3339()),
439            pattern.status.to_string(),
440            source_sessions,
441            related_files,
442            pattern.suggested_content,
443            pattern.suggested_target.to_string(),
444            pattern.project,
445            pattern.generation_failed as i32,
446        ],
447    )?;
448    Ok(())
449}
450
451/// Update an existing pattern with new evidence (merge).
452pub fn update_pattern_merge(
453    conn: &Connection,
454    id: &str,
455    new_sessions: &[String],
456    new_confidence: f64,
457    new_last_seen: DateTime<Utc>,
458    additional_times_seen: i64,
459) -> Result<(), CoreError> {
460    // Load existing source_sessions and merge
461    let existing_sessions: String = conn.query_row(
462        "SELECT source_sessions FROM patterns WHERE id = ?1",
463        params![id],
464        |row| row.get(0),
465    )?;
466
467    let mut sessions: Vec<String> =
468        serde_json::from_str(&existing_sessions).unwrap_or_default();
469    for s in new_sessions {
470        if !sessions.contains(s) {
471            sessions.push(s.clone());
472        }
473    }
474    let merged_sessions = serde_json::to_string(&sessions).unwrap_or_else(|_| "[]".to_string());
475
476    conn.execute(
477        "UPDATE patterns SET
478            confidence = MAX(confidence, ?2),
479            times_seen = times_seen + ?3,
480            last_seen = ?4,
481            source_sessions = ?5
482         WHERE id = ?1",
483        params![
484            id,
485            new_confidence,
486            additional_times_seen,
487            new_last_seen.to_rfc3339(),
488            merged_sessions,
489        ],
490    )?;
491    Ok(())
492}
493
494/// Get patterns filtered by status and optionally by project.
495pub fn get_patterns(
496    conn: &Connection,
497    statuses: &[&str],
498    project: Option<&str>,
499) -> Result<Vec<Pattern>, CoreError> {
500    if statuses.is_empty() {
501        return Ok(Vec::new());
502    }
503
504    let placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 1)).collect();
505    let status_clause = placeholders.join(", ");
506
507    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match project {
508        Some(proj) => {
509            let q = format!(
510                "SELECT {PATTERN_COLUMNS}
511                 FROM patterns WHERE status IN ({}) AND (project = ?{} OR project IS NULL)
512                 ORDER BY confidence DESC",
513                status_clause,
514                statuses.len() + 1
515            );
516            let mut p: Vec<Box<dyn rusqlite::types::ToSql>> = statuses.iter().map(|s| Box::new(s.to_string()) as Box<dyn rusqlite::types::ToSql>).collect();
517            p.push(Box::new(proj.to_string()));
518            (q, p)
519        }
520        None => {
521            let q = format!(
522                "SELECT {PATTERN_COLUMNS}
523                 FROM patterns WHERE status IN ({})
524                 ORDER BY confidence DESC",
525                status_clause
526            );
527            let p: Vec<Box<dyn rusqlite::types::ToSql>> = statuses.iter().map(|s| Box::new(s.to_string()) as Box<dyn rusqlite::types::ToSql>).collect();
528            (q, p)
529        }
530    };
531
532    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
533    let mut stmt = conn.prepare(&query)?;
534    let patterns = stmt
535        .query_map(params_refs.as_slice(), |row| {
536            Ok(read_pattern_row(row))
537        })?
538        .filter_map(|r| r.ok())
539        .collect();
540
541    Ok(patterns)
542}
543
544/// Get all patterns, optionally filtered by project.
545pub fn get_all_patterns(conn: &Connection, project: Option<&str>) -> Result<Vec<Pattern>, CoreError> {
546    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match project {
547        Some(proj) => {
548            let q = format!(
549                "SELECT {PATTERN_COLUMNS}
550                 FROM patterns WHERE project = ?1 OR project IS NULL
551                 ORDER BY confidence DESC"
552            );
553            (q, vec![Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>])
554        }
555        None => {
556            let q = format!(
557                "SELECT {PATTERN_COLUMNS}
558                 FROM patterns ORDER BY confidence DESC"
559            );
560            (q, vec![])
561        }
562    };
563
564    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
565    let mut stmt = conn.prepare(&query)?;
566    let patterns = stmt
567        .query_map(params_refs.as_slice(), |row| Ok(read_pattern_row(row)))?
568        .filter_map(|r| r.ok())
569        .collect();
570
571    Ok(patterns)
572}
573
574fn read_pattern_row(row: &rusqlite::Row<'_>) -> Pattern {
575    let source_sessions_str: String = row.get(9).unwrap_or_default();
576    let related_files_str: String = row.get(10).unwrap_or_default();
577    let first_seen_str: String = row.get(5).unwrap_or_default();
578    let last_seen_str: String = row.get(6).unwrap_or_default();
579    let last_projected_str: Option<String> = row.get(7).unwrap_or(None);
580    let gen_failed: i32 = row.get(14).unwrap_or(0);
581
582    Pattern {
583        id: row.get(0).unwrap_or_default(),
584        pattern_type: PatternType::from_str(&row.get::<_, String>(1).unwrap_or_default()),
585        description: row.get(2).unwrap_or_default(),
586        confidence: row.get(3).unwrap_or(0.0),
587        times_seen: row.get(4).unwrap_or(1),
588        first_seen: DateTime::parse_from_rfc3339(&first_seen_str)
589            .map(|d| d.with_timezone(&Utc))
590            .unwrap_or_else(|_| Utc::now()),
591        last_seen: DateTime::parse_from_rfc3339(&last_seen_str)
592            .map(|d| d.with_timezone(&Utc))
593            .unwrap_or_else(|_| Utc::now()),
594        last_projected: last_projected_str
595            .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
596            .map(|d| d.with_timezone(&Utc)),
597        status: PatternStatus::from_str(&row.get::<_, String>(8).unwrap_or_default()),
598        source_sessions: serde_json::from_str(&source_sessions_str).unwrap_or_default(),
599        related_files: serde_json::from_str(&related_files_str).unwrap_or_default(),
600        suggested_content: row.get(11).unwrap_or_default(),
601        suggested_target: SuggestedTarget::from_str(&row.get::<_, String>(12).unwrap_or_default()),
602        project: row.get(13).unwrap_or(None),
603        generation_failed: gen_failed != 0,
604    }
605}
606
607// ── Analyzed session tracking ──
608
609/// Record a session as analyzed.
610pub fn record_analyzed_session(
611    conn: &Connection,
612    session_id: &str,
613    project: &str,
614) -> Result<(), CoreError> {
615    conn.execute(
616        "INSERT OR REPLACE INTO analyzed_sessions (session_id, project, analyzed_at)
617         VALUES (?1, ?2, ?3)",
618        params![session_id, project, Utc::now().to_rfc3339()],
619    )?;
620    Ok(())
621}
622
623/// Check if a session has been analyzed.
624pub fn is_session_analyzed(conn: &Connection, session_id: &str) -> Result<bool, CoreError> {
625    let count: u64 = conn.query_row(
626        "SELECT COUNT(*) FROM analyzed_sessions WHERE session_id = ?1",
627        params![session_id],
628        |row| row.get(0),
629    )?;
630    Ok(count > 0)
631}
632
633/// Get ingested sessions for analysis within the time window.
634/// When `rolling_window` is true, returns ALL sessions in the window (re-analyzes everything).
635/// When false, only returns sessions not yet in `analyzed_sessions` (analyze-once).
636pub fn get_sessions_for_analysis(
637    conn: &Connection,
638    project: Option<&str>,
639    since: &DateTime<Utc>,
640    rolling_window: bool,
641) -> Result<Vec<IngestedSession>, CoreError> {
642    let since_str = since.to_rfc3339();
643
644    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match (project, rolling_window) {
645        (Some(proj), true) => {
646            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
647                     FROM ingested_sessions i
648                     WHERE i.project = ?1 AND i.ingested_at >= ?2
649                     ORDER BY i.ingested_at".to_string();
650            (q, vec![
651                Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>,
652                Box::new(since_str) as Box<dyn rusqlite::types::ToSql>,
653            ])
654        }
655        (Some(proj), false) => {
656            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
657                     FROM ingested_sessions i
658                     LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
659                     WHERE a.session_id IS NULL AND i.project = ?1 AND i.ingested_at >= ?2
660                     ORDER BY i.ingested_at".to_string();
661            (q, vec![
662                Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>,
663                Box::new(since_str) as Box<dyn rusqlite::types::ToSql>,
664            ])
665        }
666        (None, true) => {
667            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
668                     FROM ingested_sessions i
669                     WHERE i.ingested_at >= ?1
670                     ORDER BY i.ingested_at".to_string();
671            (q, vec![Box::new(since_str) as Box<dyn rusqlite::types::ToSql>])
672        }
673        (None, false) => {
674            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
675                     FROM ingested_sessions i
676                     LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
677                     WHERE a.session_id IS NULL AND i.ingested_at >= ?1
678                     ORDER BY i.ingested_at".to_string();
679            (q, vec![Box::new(since_str) as Box<dyn rusqlite::types::ToSql>])
680        }
681    };
682
683    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
684    let mut stmt = conn.prepare(&query)?;
685    let sessions = stmt
686        .query_map(params_refs.as_slice(), |row| {
687            let ingested_at_str: String = row.get(5)?;
688            let ingested_at = DateTime::parse_from_rfc3339(&ingested_at_str)
689                .map(|d| d.with_timezone(&Utc))
690                .unwrap_or_else(|_| Utc::now());
691            Ok(IngestedSession {
692                session_id: row.get(0)?,
693                project: row.get(1)?,
694                session_path: row.get(2)?,
695                file_size: row.get(3)?,
696                file_mtime: row.get(4)?,
697                ingested_at,
698            })
699        })?
700        .filter_map(|r| r.ok())
701        .collect();
702
703    Ok(sessions)
704}
705
706// ── Projection operations ──
707
708/// Insert a new projection record.
709pub fn insert_projection(conn: &Connection, proj: &Projection) -> Result<(), CoreError> {
710    conn.execute(
711        "INSERT INTO projections (id, pattern_id, target_type, target_path, content, applied_at, pr_url, status)
712         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
713        params![
714            proj.id,
715            proj.pattern_id,
716            proj.target_type,
717            proj.target_path,
718            proj.content,
719            proj.applied_at.to_rfc3339(),
720            proj.pr_url,
721            proj.status.to_string(),
722        ],
723    )?;
724    Ok(())
725}
726
727/// Check if a pattern already has an active projection.
728pub fn has_projection_for_pattern(conn: &Connection, pattern_id: &str) -> Result<bool, CoreError> {
729    let count: u64 = conn.query_row(
730        "SELECT COUNT(*) FROM projections WHERE pattern_id = ?1",
731        params![pattern_id],
732        |row| row.get(0),
733    )?;
734    Ok(count > 0)
735}
736
737/// Get the set of all pattern IDs that already have projections.
738pub fn get_projected_pattern_ids(
739    conn: &Connection,
740) -> Result<std::collections::HashSet<String>, CoreError> {
741    let mut stmt = conn.prepare("SELECT DISTINCT pattern_id FROM projections")?;
742    let ids = stmt
743        .query_map([], |row| row.get(0))?
744        .filter_map(|r| r.ok())
745        .collect();
746    Ok(ids)
747}
748
749/// Update a pattern's status.
750pub fn update_pattern_status(
751    conn: &Connection,
752    id: &str,
753    status: &PatternStatus,
754) -> Result<(), CoreError> {
755    conn.execute(
756        "UPDATE patterns SET status = ?2 WHERE id = ?1",
757        params![id, status.to_string()],
758    )?;
759    Ok(())
760}
761
762/// Set or clear the generation_failed flag on a pattern.
763pub fn set_generation_failed(
764    conn: &Connection,
765    id: &str,
766    failed: bool,
767) -> Result<(), CoreError> {
768    conn.execute(
769        "UPDATE patterns SET generation_failed = ?2 WHERE id = ?1",
770        params![id, failed as i32],
771    )?;
772    Ok(())
773}
774
775/// Get all projections for active patterns (for staleness detection).
776pub fn get_projections_for_active_patterns(
777    conn: &Connection,
778) -> Result<Vec<Projection>, CoreError> {
779    let mut stmt = conn.prepare(
780        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
781         FROM projections p
782         INNER JOIN patterns pat ON p.pattern_id = pat.id
783         WHERE pat.status = 'active'",
784    )?;
785
786    let projections = stmt
787        .query_map([], |row| {
788            let applied_at_str: String = row.get(5)?;
789            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
790                .map(|d| d.with_timezone(&Utc))
791                .unwrap_or_else(|_| Utc::now());
792            let status_str: String = row.get(7)?;
793            let status = ProjectionStatus::from_str(&status_str)
794                .unwrap_or(ProjectionStatus::Applied);
795            Ok(Projection {
796                id: row.get(0)?,
797                pattern_id: row.get(1)?,
798                target_type: row.get(2)?,
799                target_path: row.get(3)?,
800                content: row.get(4)?,
801                applied_at,
802                pr_url: row.get(6)?,
803                status,
804            })
805        })?
806        .filter_map(|r| r.ok())
807        .collect();
808
809    Ok(projections)
810}
811
812/// Update a pattern's last_projected timestamp to now.
813pub fn update_pattern_last_projected(conn: &Connection, id: &str) -> Result<(), CoreError> {
814    conn.execute(
815        "UPDATE patterns SET last_projected = ?2 WHERE id = ?1",
816        params![id, Utc::now().to_rfc3339()],
817    )?;
818    Ok(())
819}
820
821/// Get all projections with pending_review status.
822pub fn get_pending_review_projections(conn: &Connection) -> Result<Vec<Projection>, CoreError> {
823    let mut stmt = conn.prepare(
824        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
825         FROM projections p
826         WHERE p.status = 'pending_review'
827         ORDER BY p.applied_at ASC",
828    )?;
829
830    let projections = stmt
831        .query_map([], |row| {
832            let applied_at_str: String = row.get(5)?;
833            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
834                .map(|d| d.with_timezone(&Utc))
835                .unwrap_or_else(|_| Utc::now());
836            let status_str: String = row.get(7)?;
837            let status = ProjectionStatus::from_str(&status_str)
838                .unwrap_or(ProjectionStatus::PendingReview);
839            Ok(Projection {
840                id: row.get(0)?,
841                pattern_id: row.get(1)?,
842                target_type: row.get(2)?,
843                target_path: row.get(3)?,
844                content: row.get(4)?,
845                applied_at,
846                pr_url: row.get(6)?,
847                status,
848            })
849        })?
850        .filter_map(|r| r.ok())
851        .collect();
852
853    Ok(projections)
854}
855
856/// Update a projection's status.
857pub fn update_projection_status(
858    conn: &Connection,
859    projection_id: &str,
860    status: &ProjectionStatus,
861) -> Result<(), CoreError> {
862    conn.execute(
863        "UPDATE projections SET status = ?2 WHERE id = ?1",
864        params![projection_id, status.to_string()],
865    )?;
866    Ok(())
867}
868
869/// Delete a projection record.
870pub fn delete_projection(conn: &Connection, projection_id: &str) -> Result<(), CoreError> {
871    conn.execute("DELETE FROM projections WHERE id = ?1", params![projection_id])?;
872    Ok(())
873}
874
875/// Get applied projections that have a PR URL (for sync).
876pub fn get_applied_projections_with_pr(conn: &Connection) -> Result<Vec<Projection>, CoreError> {
877    let mut stmt = conn.prepare(
878        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
879         FROM projections p
880         WHERE p.status = 'applied' AND p.pr_url IS NOT NULL",
881    )?;
882
883    let projections = stmt
884        .query_map([], |row| {
885            let applied_at_str: String = row.get(5)?;
886            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
887                .map(|d| d.with_timezone(&Utc))
888                .unwrap_or_else(|_| Utc::now());
889            let status_str: String = row.get(7)?;
890            let status = ProjectionStatus::from_str(&status_str)
891                .unwrap_or(ProjectionStatus::Applied);
892            Ok(Projection {
893                id: row.get(0)?,
894                pattern_id: row.get(1)?,
895                target_type: row.get(2)?,
896                target_path: row.get(3)?,
897                content: row.get(4)?,
898                applied_at,
899                pr_url: row.get(6)?,
900                status,
901            })
902        })?
903        .filter_map(|r| r.ok())
904        .collect();
905
906    Ok(projections)
907}
908
909/// Get pattern IDs that have projections with specific statuses.
910pub fn get_projected_pattern_ids_by_status(
911    conn: &Connection,
912    statuses: &[ProjectionStatus],
913) -> Result<std::collections::HashSet<String>, CoreError> {
914    if statuses.is_empty() {
915        return Ok(std::collections::HashSet::new());
916    }
917    let placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 1)).collect();
918    let sql = format!(
919        "SELECT DISTINCT pattern_id FROM projections WHERE status IN ({})",
920        placeholders.join(", ")
921    );
922    let mut stmt = conn.prepare(&sql)?;
923    let params: Vec<String> = statuses.iter().map(|s| s.to_string()).collect();
924    let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|s| s as &dyn rusqlite::types::ToSql).collect();
925    let ids = stmt
926        .query_map(param_refs.as_slice(), |row| row.get(0))?
927        .filter_map(|r| r.ok())
928        .collect();
929    Ok(ids)
930}
931
932/// Update a projection's PR URL.
933pub fn update_projection_pr_url(
934    conn: &Connection,
935    projection_id: &str,
936    pr_url: &str,
937) -> Result<(), CoreError> {
938    conn.execute(
939        "UPDATE projections SET pr_url = ?2 WHERE id = ?1",
940        params![projection_id, pr_url],
941    )?;
942    Ok(())
943}
944
945// ── Knowledge graph: Migration ──
946
947/// Migrate v1 patterns to v2 knowledge nodes. Returns number of nodes created.
948/// Safe to call multiple times — skips patterns that already have corresponding nodes.
949pub fn migrate_patterns_to_nodes(conn: &Connection) -> Result<usize, CoreError> {
950    let mut stmt = conn.prepare(
951        "SELECT id, pattern_type, description, confidence, status, suggested_content, suggested_target, project, first_seen, last_seen
952         FROM patterns",
953    )?;
954    let patterns: Vec<(String, String, String, f64, String, String, String, Option<String>, String, String)> = stmt.query_map([], |row| {
955        Ok((
956            row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?,
957            row.get(4)?, row.get(5)?, row.get(6)?, row.get(7)?,
958            row.get(8)?, row.get(9)?,
959        ))
960    })?.filter_map(|r| r.ok()).collect();
961
962    let mut count = 0;
963    for (id, pattern_type, description, confidence, _status, suggested_content, suggested_target, project, first_seen, last_seen) in &patterns {
964        let node_id = format!("migrated-{id}");
965
966        // Skip if already migrated
967        if get_node(conn, &node_id)?.is_some() {
968            continue;
969        }
970
971        // Determine node type using the spec's deterministic mapping
972        let content_lower = suggested_content.to_lowercase();
973        let has_directive_keyword = content_lower.contains("always") || content_lower.contains("never");
974        let node_type = if *confidence >= 0.85 && has_directive_keyword {
975            NodeType::Directive
976        } else {
977            match (pattern_type.as_str(), suggested_target.as_str()) {
978                ("repetitive_instruction", "claude_md") => NodeType::Rule,
979                ("repetitive_instruction", "skill") => NodeType::Directive,
980                ("recurring_mistake", _) => NodeType::Pattern,
981                ("workflow_pattern", "skill") => NodeType::Skill,
982                ("workflow_pattern", "claude_md") => NodeType::Rule,
983                ("stale_context", _) => NodeType::Memory,
984                ("redundant_context", _) => NodeType::Memory,
985                _ => NodeType::Pattern,
986            }
987        };
988
989        let created_at = DateTime::parse_from_rfc3339(first_seen)
990            .unwrap_or_default()
991            .with_timezone(&Utc);
992        let updated_at = DateTime::parse_from_rfc3339(last_seen)
993            .unwrap_or_default()
994            .with_timezone(&Utc);
995
996        let node = KnowledgeNode {
997            id: node_id,
998            node_type,
999            scope: NodeScope::Project,
1000            project_id: project.clone(),
1001            content: description.clone(),
1002            confidence: *confidence,
1003            status: NodeStatus::Active,
1004            created_at,
1005            updated_at,
1006            projected_at: None,
1007            pr_url: None,
1008        };
1009        insert_node(conn, &node)?;
1010        count += 1;
1011    }
1012    Ok(count)
1013}
1014
1015// ── Knowledge graph: Node operations ──
1016
1017pub fn insert_node(conn: &Connection, node: &KnowledgeNode) -> Result<(), CoreError> {
1018    conn.execute(
1019        "INSERT INTO nodes (id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url)
1020         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
1021        params![
1022            node.id,
1023            node.node_type.to_string(),
1024            node.scope.to_string(),
1025            node.project_id,
1026            node.content,
1027            node.confidence,
1028            node.status.to_string(),
1029            node.created_at.to_rfc3339(),
1030            node.updated_at.to_rfc3339(),
1031            node.projected_at,
1032            node.pr_url,
1033        ],
1034    )?;
1035    Ok(())
1036}
1037
1038pub fn get_node(conn: &Connection, id: &str) -> Result<Option<KnowledgeNode>, CoreError> {
1039    let mut stmt = conn.prepare(
1040        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1041         FROM nodes WHERE id = ?1",
1042    )?;
1043    let result = stmt.query_row(params![id], |row| {
1044        Ok(KnowledgeNode {
1045            id: row.get(0)?,
1046            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1047            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1048            project_id: row.get(3)?,
1049            content: row.get(4)?,
1050            confidence: row.get(5)?,
1051            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1052            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1053                .unwrap_or_default()
1054                .with_timezone(&Utc),
1055            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1056                .unwrap_or_default()
1057                .with_timezone(&Utc),
1058            projected_at: row.get(9)?,
1059            pr_url: row.get(10)?,
1060        })
1061    }).optional()?;
1062    Ok(result)
1063}
1064
1065pub fn get_nodes_by_scope(
1066    conn: &Connection,
1067    scope: &NodeScope,
1068    project_id: Option<&str>,
1069    statuses: &[NodeStatus],
1070) -> Result<Vec<KnowledgeNode>, CoreError> {
1071    if statuses.is_empty() {
1072        return Ok(Vec::new());
1073    }
1074    let status_placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 3)).collect();
1075    let sql = format!(
1076        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1077         FROM nodes WHERE scope = ?1 AND (?2 IS NULL OR project_id = ?2) AND status IN ({})
1078         ORDER BY confidence DESC",
1079        status_placeholders.join(", ")
1080    );
1081    let mut stmt = conn.prepare(&sql)?;
1082    let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = vec![
1083        Box::new(scope.to_string()),
1084        Box::new(project_id.map(|s| s.to_string())),
1085    ];
1086    for s in statuses {
1087        params_vec.push(Box::new(s.to_string()));
1088    }
1089    let rows = stmt.query_map(rusqlite::params_from_iter(params_vec.iter().map(|p| p.as_ref())), |row| {
1090        Ok(KnowledgeNode {
1091            id: row.get(0)?,
1092            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1093            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1094            project_id: row.get(3)?,
1095            content: row.get(4)?,
1096            confidence: row.get(5)?,
1097            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1098            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1099                .unwrap_or_default()
1100                .with_timezone(&Utc),
1101            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1102                .unwrap_or_default()
1103                .with_timezone(&Utc),
1104            projected_at: row.get(9)?,
1105            pr_url: row.get(10)?,
1106        })
1107    })?;
1108    let mut nodes = Vec::new();
1109    for row in rows {
1110        nodes.push(row?);
1111    }
1112    Ok(nodes)
1113}
1114
1115/// Get all nodes with a given status, ordered by confidence DESC.
1116pub fn get_nodes_by_status(
1117    conn: &Connection,
1118    status: &NodeStatus,
1119) -> Result<Vec<KnowledgeNode>, CoreError> {
1120    let mut stmt = conn.prepare(
1121        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1122         FROM nodes WHERE status = ?1
1123         ORDER BY confidence DESC",
1124    )?;
1125    let rows = stmt.query_map(params![status.to_string()], |row| {
1126        Ok(KnowledgeNode {
1127            id: row.get(0)?,
1128            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1129            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1130            project_id: row.get(3)?,
1131            content: row.get(4)?,
1132            confidence: row.get(5)?,
1133            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1134            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1135                .unwrap_or_default()
1136                .with_timezone(&Utc),
1137            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1138                .unwrap_or_default()
1139                .with_timezone(&Utc),
1140            projected_at: row.get(9)?,
1141            pr_url: row.get(10)?,
1142        })
1143    })?;
1144    let mut nodes = Vec::new();
1145    for row in rows {
1146        nodes.push(row?);
1147    }
1148    Ok(nodes)
1149}
1150
1151pub fn update_node_confidence(conn: &Connection, id: &str, confidence: f64) -> Result<(), CoreError> {
1152    conn.execute(
1153        "UPDATE nodes SET confidence = ?1, updated_at = ?2 WHERE id = ?3",
1154        params![confidence, Utc::now().to_rfc3339(), id],
1155    )?;
1156    Ok(())
1157}
1158
1159pub fn update_node_status(conn: &Connection, id: &str, status: &NodeStatus) -> Result<(), CoreError> {
1160    conn.execute(
1161        "UPDATE nodes SET status = ?1, updated_at = ?2 WHERE id = ?3",
1162        params![status.to_string(), Utc::now().to_rfc3339(), id],
1163    )?;
1164    Ok(())
1165}
1166
1167pub fn update_node_content(conn: &Connection, id: &str, content: &str) -> Result<(), CoreError> {
1168    conn.execute(
1169        "UPDATE nodes SET content = ?1, updated_at = ?2 WHERE id = ?3",
1170        params![content, Utc::now().to_rfc3339(), id],
1171    )?;
1172    Ok(())
1173}
1174
1175// ── Knowledge graph: Edge operations ──
1176
1177pub fn insert_edge(conn: &Connection, edge: &KnowledgeEdge) -> Result<(), CoreError> {
1178    conn.execute(
1179        "INSERT OR IGNORE INTO edges (source_id, target_id, type, created_at)
1180         VALUES (?1, ?2, ?3, ?4)",
1181        params![
1182            edge.source_id,
1183            edge.target_id,
1184            edge.edge_type.to_string(),
1185            edge.created_at.to_rfc3339(),
1186        ],
1187    )?;
1188    Ok(())
1189}
1190
1191pub fn get_edges_from(conn: &Connection, source_id: &str) -> Result<Vec<KnowledgeEdge>, CoreError> {
1192    let mut stmt = conn.prepare(
1193        "SELECT source_id, target_id, type, created_at FROM edges WHERE source_id = ?1",
1194    )?;
1195    let rows = stmt.query_map(params![source_id], |row| {
1196        Ok(KnowledgeEdge {
1197            source_id: row.get(0)?,
1198            target_id: row.get(1)?,
1199            edge_type: EdgeType::from_str(&row.get::<_, String>(2)?).unwrap_or(EdgeType::Supports),
1200            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
1201                .unwrap_or_default()
1202                .with_timezone(&Utc),
1203        })
1204    })?;
1205    let mut edges = Vec::new();
1206    for row in rows {
1207        edges.push(row?);
1208    }
1209    Ok(edges)
1210}
1211
1212pub fn get_edges_to(conn: &Connection, target_id: &str) -> Result<Vec<KnowledgeEdge>, CoreError> {
1213    let mut stmt = conn.prepare(
1214        "SELECT source_id, target_id, type, created_at FROM edges WHERE target_id = ?1",
1215    )?;
1216    let rows = stmt.query_map(params![target_id], |row| {
1217        Ok(KnowledgeEdge {
1218            source_id: row.get(0)?,
1219            target_id: row.get(1)?,
1220            edge_type: EdgeType::from_str(&row.get::<_, String>(2)?).unwrap_or(EdgeType::Supports),
1221            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
1222                .unwrap_or_default()
1223                .with_timezone(&Utc),
1224        })
1225    })?;
1226    let mut edges = Vec::new();
1227    for row in rows {
1228        edges.push(row?);
1229    }
1230    Ok(edges)
1231}
1232
1233pub fn delete_edge(conn: &Connection, source_id: &str, target_id: &str, edge_type: &EdgeType) -> Result<(), CoreError> {
1234    conn.execute(
1235        "DELETE FROM edges WHERE source_id = ?1 AND target_id = ?2 AND type = ?3",
1236        params![source_id, target_id, edge_type.to_string()],
1237    )?;
1238    Ok(())
1239}
1240
1241/// Mark new_id as superseding old_id: archives old node and creates supersedes edge.
1242pub fn supersede_node(conn: &Connection, new_id: &str, old_id: &str) -> Result<(), CoreError> {
1243    update_node_status(conn, old_id, &NodeStatus::Archived)?;
1244    let edge = KnowledgeEdge {
1245        source_id: new_id.to_string(),
1246        target_id: old_id.to_string(),
1247        edge_type: EdgeType::Supersedes,
1248        created_at: Utc::now(),
1249    };
1250    insert_edge(conn, &edge)?;
1251    Ok(())
1252}
1253
1254/// Result of applying a batch of graph operations.
1255#[derive(Debug, Clone, Default)]
1256pub struct ApplyGraphResult {
1257    pub nodes_created: usize,
1258    pub nodes_updated: usize,
1259    pub edges_created: usize,
1260    pub nodes_merged: usize,
1261}
1262
1263/// Apply a batch of graph operations to the database.
1264pub fn apply_graph_operations(conn: &Connection, ops: &[GraphOperation]) -> Result<ApplyGraphResult, CoreError> {
1265    let mut result = ApplyGraphResult::default();
1266
1267    for op in ops {
1268        match op {
1269            GraphOperation::CreateNode { node_type, scope, project_id, content, confidence } => {
1270                let node = KnowledgeNode {
1271                    id: uuid::Uuid::new_v4().to_string(),
1272                    node_type: node_type.clone(),
1273                    scope: scope.clone(),
1274                    project_id: project_id.clone(),
1275                    content: content.clone(),
1276                    confidence: *confidence,
1277                    status: NodeStatus::Active,
1278                    created_at: Utc::now(),
1279                    updated_at: Utc::now(),
1280                    projected_at: None,
1281                    pr_url: None,
1282                };
1283                insert_node(conn, &node)?;
1284                result.nodes_created += 1;
1285            }
1286            GraphOperation::UpdateNode { id, confidence, content } => {
1287                if let Some(conf) = confidence {
1288                    update_node_confidence(conn, id, *conf)?;
1289                }
1290                if let Some(cont) = content {
1291                    update_node_content(conn, id, cont)?;
1292                }
1293                result.nodes_updated += 1;
1294            }
1295            GraphOperation::CreateEdge { source_id, target_id, edge_type } => {
1296                let edge = KnowledgeEdge {
1297                    source_id: source_id.clone(),
1298                    target_id: target_id.clone(),
1299                    edge_type: edge_type.clone(),
1300                    created_at: Utc::now(),
1301                };
1302                insert_edge(conn, &edge)?;
1303                result.edges_created += 1;
1304            }
1305            GraphOperation::MergeNodes { keep_id, remove_id } => {
1306                supersede_node(conn, keep_id, remove_id)?;
1307                result.nodes_merged += 1;
1308            }
1309        }
1310    }
1311
1312    Ok(result)
1313}
1314
1315// ── Knowledge graph: Project operations ──
1316
1317/// Generate a human-readable slug from a repository path.
1318pub fn generate_project_slug(repo_path: &str) -> String {
1319    let name = std::path::Path::new(repo_path)
1320        .file_name()
1321        .and_then(|n| n.to_str())
1322        .unwrap_or("unnamed-project");
1323
1324    let slug: String = name
1325        .to_lowercase()
1326        .chars()
1327        .map(|c| if c.is_alphanumeric() { c } else { '-' })
1328        .collect();
1329    let slug = slug.trim_matches('-').to_string();
1330    if slug.is_empty() {
1331        "unnamed-project".to_string()
1332    } else {
1333        // Collapse consecutive hyphens
1334        let mut result = String::new();
1335        let mut prev_hyphen = false;
1336        for c in slug.chars() {
1337            if c == '-' {
1338                if !prev_hyphen {
1339                    result.push(c);
1340                }
1341                prev_hyphen = true;
1342            } else {
1343                result.push(c);
1344                prev_hyphen = false;
1345            }
1346        }
1347        result
1348    }
1349}
1350
1351/// Generate a unique project slug, appending -2, -3, etc. if needed.
1352pub fn generate_unique_project_slug(conn: &Connection, repo_path: &str) -> Result<String, CoreError> {
1353    let base = generate_project_slug(repo_path);
1354    if get_project(conn, &base)?.is_none() {
1355        return Ok(base);
1356    }
1357    for i in 2..100 {
1358        let candidate = format!("{base}-{i}");
1359        if get_project(conn, &candidate)?.is_none() {
1360            return Ok(candidate);
1361        }
1362    }
1363    Ok(format!("{base}-{}", &uuid::Uuid::new_v4().to_string()[..8]))
1364}
1365
1366pub fn upsert_project(conn: &Connection, project: &KnowledgeProject) -> Result<(), CoreError> {
1367    conn.execute(
1368        "INSERT INTO projects (id, path, remote_url, agent_type, last_seen)
1369         VALUES (?1, ?2, ?3, ?4, ?5)
1370         ON CONFLICT(id) DO UPDATE SET
1371             path = excluded.path,
1372             remote_url = COALESCE(excluded.remote_url, projects.remote_url),
1373             last_seen = excluded.last_seen",
1374        params![
1375            project.id,
1376            project.path,
1377            project.remote_url,
1378            project.agent_type,
1379            project.last_seen.to_rfc3339(),
1380        ],
1381    )?;
1382    Ok(())
1383}
1384
1385pub fn get_project(conn: &Connection, id: &str) -> Result<Option<KnowledgeProject>, CoreError> {
1386    let mut stmt = conn.prepare(
1387        "SELECT id, path, remote_url, agent_type, last_seen FROM projects WHERE id = ?1",
1388    )?;
1389    let result = stmt.query_row(params![id], |row| {
1390        Ok(KnowledgeProject {
1391            id: row.get(0)?,
1392            path: row.get(1)?,
1393            remote_url: row.get(2)?,
1394            agent_type: row.get(3)?,
1395            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1396                .unwrap_or_default()
1397                .with_timezone(&Utc),
1398        })
1399    }).optional()?;
1400    Ok(result)
1401}
1402
1403/// Find an existing project by its filesystem path.
1404pub fn get_project_by_path(conn: &Connection, path: &str) -> Result<Option<KnowledgeProject>, CoreError> {
1405    let mut stmt = conn.prepare(
1406        "SELECT id, path, remote_url, agent_type, last_seen FROM projects WHERE path = ?1",
1407    )?;
1408    let result = stmt.query_row(params![path], |row| {
1409        Ok(KnowledgeProject {
1410            id: row.get(0)?,
1411            path: row.get(1)?,
1412            remote_url: row.get(2)?,
1413            agent_type: row.get(3)?,
1414            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1415                .unwrap_or_default()
1416                .with_timezone(&Utc),
1417        })
1418    }).optional()?;
1419    Ok(result)
1420}
1421
1422/// Register a project if not already registered (by path). Returns the project ID.
1423/// If already registered, updates last_seen and remote_url.
1424pub fn ensure_project_registered(conn: &Connection, path: &str) -> Result<String, CoreError> {
1425    // Check if already registered by path
1426    if let Some(existing) = get_project_by_path(conn, path)? {
1427        // Update last_seen
1428        let mut updated = existing.clone();
1429        updated.last_seen = Utc::now();
1430        updated.remote_url = crate::git::remote_url().or(existing.remote_url);
1431        upsert_project(conn, &updated)?;
1432        return Ok(existing.id);
1433    }
1434
1435    // New project — generate slug and register
1436    let slug = generate_unique_project_slug(conn, path)?;
1437    let project = KnowledgeProject {
1438        id: slug.clone(),
1439        path: path.to_string(),
1440        remote_url: crate::git::remote_url(),
1441        agent_type: "claude_code".to_string(),
1442        last_seen: Utc::now(),
1443    };
1444    upsert_project(conn, &project)?;
1445    Ok(slug)
1446}
1447
1448pub fn get_project_by_remote_url(conn: &Connection, remote_url: &str) -> Result<Option<KnowledgeProject>, CoreError> {
1449    let mut stmt = conn.prepare(
1450        "SELECT id, path, remote_url, agent_type, last_seen FROM projects WHERE remote_url = ?1",
1451    )?;
1452    let result = stmt.query_row(params![remote_url], |row| {
1453        Ok(KnowledgeProject {
1454            id: row.get(0)?,
1455            path: row.get(1)?,
1456            remote_url: row.get(2)?,
1457            agent_type: row.get(3)?,
1458            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1459                .unwrap_or_default()
1460                .with_timezone(&Utc),
1461        })
1462    }).optional()?;
1463    Ok(result)
1464}
1465
1466/// Get active nodes that haven't been projected yet, ordered by confidence DESC.
1467pub fn get_unprojected_nodes(conn: &Connection) -> Result<Vec<KnowledgeNode>, CoreError> {
1468    let mut stmt = conn.prepare(
1469        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1470         FROM nodes WHERE status = 'active' AND projected_at IS NULL
1471         ORDER BY confidence DESC",
1472    )?;
1473    let rows = stmt.query_map([], |row| {
1474        Ok(KnowledgeNode {
1475            id: row.get(0)?,
1476            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1477            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1478            project_id: row.get(3)?,
1479            content: row.get(4)?,
1480            confidence: row.get(5)?,
1481            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1482            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1483                .unwrap_or_default()
1484                .with_timezone(&Utc),
1485            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1486                .unwrap_or_default()
1487                .with_timezone(&Utc),
1488            projected_at: row.get(9)?,
1489            pr_url: row.get(10)?,
1490        })
1491    })?;
1492    let mut nodes = Vec::new();
1493    for row in rows {
1494        nodes.push(row?);
1495    }
1496    Ok(nodes)
1497}
1498
1499/// Mark a node as projected (direct write, no PR).
1500pub fn mark_node_projected(conn: &Connection, id: &str) -> Result<(), CoreError> {
1501    conn.execute(
1502        "UPDATE nodes SET projected_at = ?1 WHERE id = ?2",
1503        params![Utc::now().to_rfc3339(), id],
1504    )?;
1505    Ok(())
1506}
1507
1508/// Mark a node as projected via PR.
1509pub fn mark_node_projected_with_pr(conn: &Connection, id: &str, pr_url: &str) -> Result<(), CoreError> {
1510    conn.execute(
1511        "UPDATE nodes SET projected_at = ?1, pr_url = ?2 WHERE id = ?3",
1512        params![Utc::now().to_rfc3339(), pr_url, id],
1513    )?;
1514    Ok(())
1515}
1516
1517/// Get all nodes with an associated PR URL.
1518pub fn get_nodes_with_pr(conn: &Connection) -> Result<Vec<KnowledgeNode>, CoreError> {
1519    let mut stmt = conn.prepare(
1520        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1521         FROM nodes WHERE pr_url IS NOT NULL",
1522    )?;
1523    let rows = stmt.query_map([], |row| {
1524        Ok(KnowledgeNode {
1525            id: row.get(0)?,
1526            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1527            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1528            project_id: row.get(3)?,
1529            content: row.get(4)?,
1530            confidence: row.get(5)?,
1531            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1532            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1533                .unwrap_or_default()
1534                .with_timezone(&Utc),
1535            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1536                .unwrap_or_default()
1537                .with_timezone(&Utc),
1538            projected_at: row.get(9)?,
1539            pr_url: row.get(10)?,
1540        })
1541    })?;
1542    let mut nodes = Vec::new();
1543    for row in rows {
1544        nodes.push(row?);
1545    }
1546    Ok(nodes)
1547}
1548
1549/// Clear PR URL from nodes after merge.
1550pub fn clear_node_pr(conn: &Connection, pr_url: &str) -> Result<(), CoreError> {
1551    conn.execute(
1552        "UPDATE nodes SET pr_url = NULL WHERE pr_url = ?1",
1553        params![pr_url],
1554    )?;
1555    Ok(())
1556}
1557
1558/// Dismiss all nodes for a closed PR.
1559pub fn dismiss_nodes_for_pr(conn: &Connection, pr_url: &str) -> Result<(), CoreError> {
1560    conn.execute(
1561        "UPDATE nodes SET status = 'dismissed', pr_url = NULL WHERE pr_url = ?1",
1562        params![pr_url],
1563    )?;
1564    Ok(())
1565}
1566
1567pub fn get_all_projects(conn: &Connection) -> Result<Vec<KnowledgeProject>, CoreError> {
1568    let mut stmt = conn.prepare(
1569        "SELECT id, path, remote_url, agent_type, last_seen FROM projects ORDER BY last_seen DESC",
1570    )?;
1571    let rows = stmt.query_map([], |row| {
1572        Ok(KnowledgeProject {
1573            id: row.get(0)?,
1574            path: row.get(1)?,
1575            remote_url: row.get(2)?,
1576            agent_type: row.get(3)?,
1577            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1578                .unwrap_or_default()
1579                .with_timezone(&Utc),
1580        })
1581    })?;
1582    let mut projects = Vec::new();
1583    for row in rows {
1584        projects.push(row?);
1585    }
1586    Ok(projects)
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591    use super::*;
1592    use crate::models::*;
1593
1594    fn test_db() -> Connection {
1595        let conn = Connection::open_in_memory().unwrap();
1596        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
1597        migrate(&conn).unwrap();
1598        conn
1599    }
1600
1601    fn test_pattern(id: &str, description: &str) -> Pattern {
1602        Pattern {
1603            id: id.to_string(),
1604            pattern_type: PatternType::RepetitiveInstruction,
1605            description: description.to_string(),
1606            confidence: 0.85,
1607            times_seen: 1,
1608            first_seen: Utc::now(),
1609            last_seen: Utc::now(),
1610            last_projected: None,
1611            status: PatternStatus::Discovered,
1612            source_sessions: vec!["sess-1".to_string()],
1613            related_files: vec![],
1614            suggested_content: "Always do X".to_string(),
1615            suggested_target: SuggestedTarget::ClaudeMd,
1616            project: Some("/test/project".to_string()),
1617            generation_failed: false,
1618        }
1619    }
1620
1621    #[test]
1622    fn test_insert_and_get_pattern() {
1623        let conn = test_db();
1624        let pattern = test_pattern("pat-1", "Use uv for Python packages");
1625        insert_pattern(&conn, &pattern).unwrap();
1626
1627        let patterns = get_all_patterns(&conn, None).unwrap();
1628        assert_eq!(patterns.len(), 1);
1629        assert_eq!(patterns[0].id, "pat-1");
1630        assert_eq!(patterns[0].description, "Use uv for Python packages");
1631        assert!((patterns[0].confidence - 0.85).abs() < f64::EPSILON);
1632    }
1633
1634    #[test]
1635    fn test_pattern_merge_update() {
1636        let conn = test_db();
1637        let pattern = test_pattern("pat-1", "Use uv for Python packages");
1638        insert_pattern(&conn, &pattern).unwrap();
1639
1640        update_pattern_merge(
1641            &conn,
1642            "pat-1",
1643            &["sess-2".to_string(), "sess-3".to_string()],
1644            0.92,
1645            Utc::now(),
1646            2,
1647        )
1648        .unwrap();
1649
1650        let patterns = get_all_patterns(&conn, None).unwrap();
1651        assert_eq!(patterns[0].times_seen, 3);
1652        assert!((patterns[0].confidence - 0.92).abs() < f64::EPSILON);
1653        assert_eq!(patterns[0].source_sessions.len(), 3);
1654    }
1655
1656    #[test]
1657    fn test_get_patterns_by_status() {
1658        let conn = test_db();
1659        let p1 = test_pattern("pat-1", "Pattern one");
1660        let mut p2 = test_pattern("pat-2", "Pattern two");
1661        p2.status = PatternStatus::Active;
1662        insert_pattern(&conn, &p1).unwrap();
1663        insert_pattern(&conn, &p2).unwrap();
1664
1665        let discovered = get_patterns(&conn, &["discovered"], None).unwrap();
1666        assert_eq!(discovered.len(), 1);
1667        assert_eq!(discovered[0].id, "pat-1");
1668
1669        let active = get_patterns(&conn, &["active"], None).unwrap();
1670        assert_eq!(active.len(), 1);
1671        assert_eq!(active[0].id, "pat-2");
1672
1673        let both = get_patterns(&conn, &["discovered", "active"], None).unwrap();
1674        assert_eq!(both.len(), 2);
1675    }
1676
1677    #[test]
1678    fn test_analyzed_session_tracking() {
1679        let conn = test_db();
1680        assert!(!is_session_analyzed(&conn, "sess-1").unwrap());
1681
1682        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1683        assert!(is_session_analyzed(&conn, "sess-1").unwrap());
1684        assert!(!is_session_analyzed(&conn, "sess-2").unwrap());
1685    }
1686
1687    #[test]
1688    fn test_sessions_for_analysis() {
1689        let conn = test_db();
1690
1691        // Record an ingested session
1692        let session = IngestedSession {
1693            session_id: "sess-1".to_string(),
1694            project: "/test".to_string(),
1695            session_path: "/tmp/test.jsonl".to_string(),
1696            file_size: 100,
1697            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1698            ingested_at: Utc::now(),
1699        };
1700        record_ingested_session(&conn, &session).unwrap();
1701
1702        // It should appear in sessions for analysis (non-rolling)
1703        let since = Utc::now() - chrono::Duration::days(14);
1704        let pending = get_sessions_for_analysis(&conn, None, &since, false).unwrap();
1705        assert_eq!(pending.len(), 1);
1706
1707        // After marking as analyzed, it should not appear in non-rolling mode
1708        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1709        let pending = get_sessions_for_analysis(&conn, None, &since, false).unwrap();
1710        assert_eq!(pending.len(), 0);
1711
1712        // But it SHOULD still appear in rolling window mode
1713        let pending = get_sessions_for_analysis(&conn, None, &since, true).unwrap();
1714        assert_eq!(pending.len(), 1);
1715    }
1716
1717    #[test]
1718    fn test_insert_and_check_projection() {
1719        let conn = test_db();
1720        let pattern = test_pattern("pat-1", "Use uv");
1721        insert_pattern(&conn, &pattern).unwrap();
1722
1723        assert!(!has_projection_for_pattern(&conn, "pat-1").unwrap());
1724
1725        let proj = Projection {
1726            id: "proj-1".to_string(),
1727            pattern_id: "pat-1".to_string(),
1728            target_type: "claude_md".to_string(),
1729            target_path: "/test/CLAUDE.md".to_string(),
1730            content: "Always use uv".to_string(),
1731            applied_at: Utc::now(),
1732            pr_url: None,
1733            status: ProjectionStatus::Applied,
1734        };
1735        insert_projection(&conn, &proj).unwrap();
1736
1737        assert!(has_projection_for_pattern(&conn, "pat-1").unwrap());
1738        assert!(!has_projection_for_pattern(&conn, "pat-2").unwrap());
1739    }
1740
1741    #[test]
1742    fn test_update_pattern_status() {
1743        let conn = test_db();
1744        let pattern = test_pattern("pat-1", "Test pattern");
1745        insert_pattern(&conn, &pattern).unwrap();
1746
1747        update_pattern_status(&conn, "pat-1", &PatternStatus::Active).unwrap();
1748        let patterns = get_patterns(&conn, &["active"], None).unwrap();
1749        assert_eq!(patterns.len(), 1);
1750        assert_eq!(patterns[0].id, "pat-1");
1751    }
1752
1753    #[test]
1754    fn test_set_generation_failed() {
1755        let conn = test_db();
1756        let pattern = test_pattern("pat-1", "Test pattern");
1757        insert_pattern(&conn, &pattern).unwrap();
1758
1759        assert!(!get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1760
1761        set_generation_failed(&conn, "pat-1", true).unwrap();
1762        assert!(get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1763
1764        set_generation_failed(&conn, "pat-1", false).unwrap();
1765        assert!(!get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1766    }
1767
1768    #[test]
1769    fn test_projections_nudged_column_defaults_to_zero() {
1770        let conn = test_db();
1771
1772        // Verify the nudged column exists by preparing a statement that references it
1773        conn.prepare("SELECT nudged FROM projections").unwrap();
1774
1775        // Insert a projection without specifying nudged — should default to 0
1776        let pattern = test_pattern("pat-1", "Test pattern");
1777        insert_pattern(&conn, &pattern).unwrap();
1778
1779        let proj = Projection {
1780            id: "proj-1".to_string(),
1781            pattern_id: "pat-1".to_string(),
1782            target_type: "claude_md".to_string(),
1783            target_path: "/test/CLAUDE.md".to_string(),
1784            content: "Always use uv".to_string(),
1785            applied_at: Utc::now(),
1786            pr_url: None,
1787            status: ProjectionStatus::Applied,
1788        };
1789        insert_projection(&conn, &proj).unwrap();
1790
1791        let nudged: i64 = conn
1792            .query_row(
1793                "SELECT nudged FROM projections WHERE id = 'proj-1'",
1794                [],
1795                |row| row.get(0),
1796            )
1797            .unwrap();
1798        assert_eq!(nudged, 0, "nudged column should default to 0");
1799    }
1800
1801    // ── Tests for auto-apply pipeline DB functions ──
1802
1803    #[test]
1804    fn test_last_applied_at_empty() {
1805        let conn = test_db();
1806        let result = last_applied_at(&conn).unwrap();
1807        assert_eq!(result, None);
1808    }
1809
1810    #[test]
1811    fn test_last_applied_at_returns_max() {
1812        let conn = test_db();
1813
1814        // Insert two patterns to serve as FK targets
1815        let p1 = test_pattern("pat-1", "Pattern one");
1816        let p2 = test_pattern("pat-2", "Pattern two");
1817        insert_pattern(&conn, &p1).unwrap();
1818        insert_pattern(&conn, &p2).unwrap();
1819
1820        // Insert projections with different timestamps
1821        let earlier = chrono::DateTime::parse_from_rfc3339("2026-01-10T00:00:00Z")
1822            .unwrap()
1823            .with_timezone(&Utc);
1824        let later = chrono::DateTime::parse_from_rfc3339("2026-02-15T12:00:00Z")
1825            .unwrap()
1826            .with_timezone(&Utc);
1827
1828        let proj1 = Projection {
1829            id: "proj-1".to_string(),
1830            pattern_id: "pat-1".to_string(),
1831            target_type: "Skill".to_string(),
1832            target_path: "/path/a".to_string(),
1833            content: "content a".to_string(),
1834            applied_at: earlier,
1835            pr_url: None,
1836            status: ProjectionStatus::Applied,
1837        };
1838        let proj2 = Projection {
1839            id: "proj-2".to_string(),
1840            pattern_id: "pat-2".to_string(),
1841            target_type: "Skill".to_string(),
1842            target_path: "/path/b".to_string(),
1843            content: "content b".to_string(),
1844            applied_at: later,
1845            pr_url: None,
1846            status: ProjectionStatus::Applied,
1847        };
1848        insert_projection(&conn, &proj1).unwrap();
1849        insert_projection(&conn, &proj2).unwrap();
1850
1851        let result = last_applied_at(&conn).unwrap();
1852        assert!(result.is_some());
1853        // The max should be the later timestamp
1854        let max_ts = result.unwrap();
1855        assert!(max_ts.contains("2026-02-15"), "Expected later timestamp, got: {}", max_ts);
1856    }
1857
1858    #[test]
1859    fn test_has_unanalyzed_sessions_empty() {
1860        let conn = test_db();
1861        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1862    }
1863
1864    #[test]
1865    fn test_has_unanalyzed_sessions_with_new_session() {
1866        let conn = test_db();
1867
1868        let session = IngestedSession {
1869            session_id: "sess-1".to_string(),
1870            project: "/test".to_string(),
1871            session_path: "/tmp/test.jsonl".to_string(),
1872            file_size: 100,
1873            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1874            ingested_at: Utc::now(),
1875        };
1876        record_ingested_session(&conn, &session).unwrap();
1877
1878        assert!(has_unanalyzed_sessions(&conn).unwrap());
1879    }
1880
1881    #[test]
1882    fn test_has_unanalyzed_sessions_after_analysis() {
1883        let conn = test_db();
1884
1885        let session = IngestedSession {
1886            session_id: "sess-1".to_string(),
1887            project: "/test".to_string(),
1888            session_path: "/tmp/test.jsonl".to_string(),
1889            file_size: 100,
1890            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1891            ingested_at: Utc::now(),
1892        };
1893        record_ingested_session(&conn, &session).unwrap();
1894        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1895
1896        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1897    }
1898
1899    #[test]
1900    fn test_has_unprojected_patterns_empty() {
1901        let conn = test_db();
1902        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1903    }
1904
1905    #[test]
1906    fn test_has_unprojected_patterns_with_discovered() {
1907        let conn = test_db();
1908
1909        let pattern = test_pattern("pat-1", "Use uv for Python");
1910        insert_pattern(&conn, &pattern).unwrap();
1911
1912        assert!(has_unprojected_patterns(&conn, 0.0).unwrap());
1913    }
1914
1915    #[test]
1916    fn test_has_unprojected_patterns_after_projection() {
1917        let conn = test_db();
1918
1919        let pattern = test_pattern("pat-1", "Use uv for Python");
1920        insert_pattern(&conn, &pattern).unwrap();
1921
1922        let proj = Projection {
1923            id: "proj-1".to_string(),
1924            pattern_id: "pat-1".to_string(),
1925            target_type: "Skill".to_string(),
1926            target_path: "/path".to_string(),
1927            content: "content".to_string(),
1928            applied_at: Utc::now(),
1929            pr_url: Some("https://github.com/test/pull/1".to_string()),
1930            status: ProjectionStatus::Applied,
1931        };
1932        insert_projection(&conn, &proj).unwrap();
1933
1934        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1935    }
1936
1937    #[test]
1938    fn test_has_unprojected_patterns_excludes_generation_failed() {
1939        let conn = test_db();
1940
1941        let pattern = test_pattern("pat-1", "Use uv for Python");
1942        insert_pattern(&conn, &pattern).unwrap();
1943        set_generation_failed(&conn, "pat-1", true).unwrap();
1944
1945        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1946    }
1947
1948    #[test]
1949    fn test_has_unprojected_patterns_excludes_dbonly() {
1950        let conn = test_db();
1951
1952        let mut pattern = test_pattern("pat-1", "Internal tracking only");
1953        pattern.suggested_target = SuggestedTarget::DbOnly;
1954        insert_pattern(&conn, &pattern).unwrap();
1955
1956        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1957    }
1958
1959    #[test]
1960    fn test_auto_apply_data_triggers_full_flow() {
1961        let conn = test_db();
1962
1963        // Initially: no data, no triggers
1964        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1965        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1966
1967        // Step 1: Ingest creates sessions → triggers analyze
1968        let session = IngestedSession {
1969            session_id: "sess-1".to_string(),
1970            project: "/proj".to_string(),
1971            session_path: "/path/sess".to_string(),
1972            file_size: 100,
1973            file_mtime: "2025-01-01T00:00:00Z".to_string(),
1974            ingested_at: Utc::now(),
1975        };
1976        record_ingested_session(&conn, &session).unwrap();
1977        assert!(has_unanalyzed_sessions(&conn).unwrap());
1978
1979        // Step 2: After analysis → sessions marked, patterns created → triggers apply
1980        record_analyzed_session(&conn, "sess-1", "/proj").unwrap();
1981        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1982
1983        let p = test_pattern("pat-1", "Always use cargo fmt");
1984        insert_pattern(&conn, &p).unwrap();
1985        assert!(has_unprojected_patterns(&conn, 0.0).unwrap());
1986
1987        // Step 3: After apply → projection created with PR URL
1988        let proj = Projection {
1989            id: "proj-1".to_string(),
1990            pattern_id: "pat-1".to_string(),
1991            target_type: "Skill".to_string(),
1992            target_path: "/skills/cargo-fmt.md".to_string(),
1993            content: "skill content".to_string(),
1994            applied_at: Utc::now(),
1995            pr_url: Some("https://github.com/test/pull/42".to_string()),
1996            status: ProjectionStatus::Applied,
1997        };
1998        insert_projection(&conn, &proj).unwrap();
1999        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
2000    }
2001
2002    #[test]
2003    fn test_get_last_nudge_at_empty() {
2004        let conn = test_db();
2005        assert!(get_last_nudge_at(&conn).unwrap().is_none());
2006    }
2007
2008    #[test]
2009    fn test_unanalyzed_session_count() {
2010        let conn = test_db();
2011        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 0);
2012
2013        // Add 3 sessions
2014        for i in 1..=3 {
2015            let session = IngestedSession {
2016                session_id: format!("sess-{i}"),
2017                project: "/proj".to_string(),
2018                session_path: format!("/path/sess-{i}"),
2019                file_size: 100,
2020                file_mtime: "2025-01-01T00:00:00Z".to_string(),
2021                ingested_at: Utc::now(),
2022            };
2023            record_ingested_session(&conn, &session).unwrap();
2024        }
2025        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 3);
2026
2027        // Analyze one
2028        record_analyzed_session(&conn, "sess-1", "/proj").unwrap();
2029        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 2);
2030    }
2031
2032    #[test]
2033    fn test_set_and_get_last_nudge_at() {
2034        let conn = test_db();
2035        let now = Utc::now();
2036        set_last_nudge_at(&conn, &now).unwrap();
2037        let result = get_last_nudge_at(&conn).unwrap().unwrap();
2038        // Compare to second precision (DB stores RFC 3339)
2039        assert_eq!(
2040            result.format("%Y-%m-%dT%H:%M:%S").to_string(),
2041            now.format("%Y-%m-%dT%H:%M:%S").to_string()
2042        );
2043    }
2044
2045    #[test]
2046    fn test_projection_status_column_exists() {
2047        let conn = test_db();
2048        let pattern = test_pattern("pat-1", "Test");
2049        insert_pattern(&conn, &pattern).unwrap();
2050
2051        let proj = Projection {
2052            id: "proj-1".to_string(),
2053            pattern_id: "pat-1".to_string(),
2054            target_type: "skill".to_string(),
2055            target_path: "/test/skill.md".to_string(),
2056            content: "content".to_string(),
2057            applied_at: Utc::now(),
2058            pr_url: None,
2059            status: ProjectionStatus::PendingReview,
2060        };
2061        insert_projection(&conn, &proj).unwrap();
2062
2063        let status: String = conn
2064            .query_row(
2065                "SELECT status FROM projections WHERE id = 'proj-1'",
2066                [],
2067                |row| row.get(0),
2068            )
2069            .unwrap();
2070        assert_eq!(status, "pending_review");
2071    }
2072
2073    #[test]
2074    fn test_existing_projections_default_to_applied() {
2075        // Simulate a v2 database with existing projections
2076        let conn = Connection::open_in_memory().unwrap();
2077        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2078
2079        // Create v1 schema manually
2080        conn.execute_batch(
2081            "CREATE TABLE patterns (
2082                id TEXT PRIMARY KEY, pattern_type TEXT NOT NULL, description TEXT NOT NULL,
2083                confidence REAL NOT NULL, times_seen INTEGER NOT NULL DEFAULT 1,
2084                first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, last_projected TEXT,
2085                status TEXT NOT NULL DEFAULT 'discovered', source_sessions TEXT NOT NULL,
2086                related_files TEXT NOT NULL, suggested_content TEXT NOT NULL,
2087                suggested_target TEXT NOT NULL, project TEXT,
2088                generation_failed INTEGER NOT NULL DEFAULT 0
2089            );
2090            CREATE TABLE projections (
2091                id TEXT PRIMARY KEY, pattern_id TEXT NOT NULL REFERENCES patterns(id),
2092                target_type TEXT NOT NULL, target_path TEXT NOT NULL, content TEXT NOT NULL,
2093                applied_at TEXT NOT NULL, pr_url TEXT, nudged INTEGER NOT NULL DEFAULT 0
2094            );
2095            CREATE TABLE analyzed_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, analyzed_at TEXT NOT NULL);
2096            CREATE TABLE ingested_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, session_path TEXT NOT NULL, file_size INTEGER NOT NULL, file_mtime TEXT NOT NULL, ingested_at TEXT NOT NULL);
2097            PRAGMA user_version = 1;",
2098        ).unwrap();
2099
2100        // Insert a pattern first (FK target)
2101        conn.execute(
2102            "INSERT INTO patterns (id, pattern_type, description, confidence, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target)
2103             VALUES ('pat-1', 'workflow_pattern', 'Test', 0.8, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z', 'discovered', '[]', '[]', 'content', 'skill')",
2104            [],
2105        ).unwrap();
2106
2107        // Insert an old-style projection (no status column)
2108        conn.execute(
2109            "INSERT INTO projections (id, pattern_id, target_type, target_path, content, applied_at)
2110             VALUES ('proj-old', 'pat-1', 'skill', '/path', 'content', '2026-01-01T00:00:00Z')",
2111            [],
2112        ).unwrap();
2113
2114        // Now run migration (open_db equivalent)
2115        migrate(&conn).unwrap();
2116
2117        // Old projection should have status = 'applied'
2118        let status: String = conn
2119            .query_row("SELECT status FROM projections WHERE id = 'proj-old'", [], |row| row.get(0))
2120            .unwrap();
2121        assert_eq!(status, "applied");
2122    }
2123
2124    #[test]
2125    fn test_get_pending_review_projections() {
2126        let conn = test_db();
2127        let p1 = test_pattern("pat-1", "Pattern one");
2128        let p2 = test_pattern("pat-2", "Pattern two");
2129        insert_pattern(&conn, &p1).unwrap();
2130        insert_pattern(&conn, &p2).unwrap();
2131
2132        // One pending, one applied
2133        let proj1 = Projection {
2134            id: "proj-1".to_string(),
2135            pattern_id: "pat-1".to_string(),
2136            target_type: "skill".to_string(),
2137            target_path: "/test/a.md".to_string(),
2138            content: "content a".to_string(),
2139            applied_at: Utc::now(),
2140            pr_url: None,
2141            status: ProjectionStatus::PendingReview,
2142        };
2143        let proj2 = Projection {
2144            id: "proj-2".to_string(),
2145            pattern_id: "pat-2".to_string(),
2146            target_type: "skill".to_string(),
2147            target_path: "/test/b.md".to_string(),
2148            content: "content b".to_string(),
2149            applied_at: Utc::now(),
2150            pr_url: None,
2151            status: ProjectionStatus::Applied,
2152        };
2153        insert_projection(&conn, &proj1).unwrap();
2154        insert_projection(&conn, &proj2).unwrap();
2155
2156        let pending = get_pending_review_projections(&conn).unwrap();
2157        assert_eq!(pending.len(), 1);
2158        assert_eq!(pending[0].id, "proj-1");
2159    }
2160
2161    #[test]
2162    fn test_update_projection_status() {
2163        let conn = test_db();
2164        let p = test_pattern("pat-1", "Pattern");
2165        insert_pattern(&conn, &p).unwrap();
2166
2167        let proj = Projection {
2168            id: "proj-1".to_string(),
2169            pattern_id: "pat-1".to_string(),
2170            target_type: "skill".to_string(),
2171            target_path: "/test.md".to_string(),
2172            content: "content".to_string(),
2173            applied_at: Utc::now(),
2174            pr_url: None,
2175            status: ProjectionStatus::PendingReview,
2176        };
2177        insert_projection(&conn, &proj).unwrap();
2178
2179        update_projection_status(&conn, "proj-1", &ProjectionStatus::Applied).unwrap();
2180
2181        let status: String = conn
2182            .query_row("SELECT status FROM projections WHERE id = 'proj-1'", [], |row| row.get(0))
2183            .unwrap();
2184        assert_eq!(status, "applied");
2185    }
2186
2187    #[test]
2188    fn test_delete_projection() {
2189        let conn = test_db();
2190        let p = test_pattern("pat-1", "Pattern");
2191        insert_pattern(&conn, &p).unwrap();
2192
2193        let proj = Projection {
2194            id: "proj-1".to_string(),
2195            pattern_id: "pat-1".to_string(),
2196            target_type: "skill".to_string(),
2197            target_path: "/test.md".to_string(),
2198            content: "content".to_string(),
2199            applied_at: Utc::now(),
2200            pr_url: None,
2201            status: ProjectionStatus::PendingReview,
2202        };
2203        insert_projection(&conn, &proj).unwrap();
2204        assert!(has_projection_for_pattern(&conn, "pat-1").unwrap());
2205
2206        delete_projection(&conn, "proj-1").unwrap();
2207        assert!(!has_projection_for_pattern(&conn, "pat-1").unwrap());
2208    }
2209
2210    #[test]
2211    fn test_get_projections_with_pr_url() {
2212        let conn = test_db();
2213        let p1 = test_pattern("pat-1", "Pattern one");
2214        let p2 = test_pattern("pat-2", "Pattern two");
2215        insert_pattern(&conn, &p1).unwrap();
2216        insert_pattern(&conn, &p2).unwrap();
2217
2218        let proj1 = Projection {
2219            id: "proj-1".to_string(),
2220            pattern_id: "pat-1".to_string(),
2221            target_type: "skill".to_string(),
2222            target_path: "/a.md".to_string(),
2223            content: "a".to_string(),
2224            applied_at: Utc::now(),
2225            pr_url: Some("https://github.com/test/pull/1".to_string()),
2226            status: ProjectionStatus::Applied,
2227        };
2228        let proj2 = Projection {
2229            id: "proj-2".to_string(),
2230            pattern_id: "pat-2".to_string(),
2231            target_type: "skill".to_string(),
2232            target_path: "/b.md".to_string(),
2233            content: "b".to_string(),
2234            applied_at: Utc::now(),
2235            pr_url: None,
2236            status: ProjectionStatus::Applied,
2237        };
2238        insert_projection(&conn, &proj1).unwrap();
2239        insert_projection(&conn, &proj2).unwrap();
2240
2241        let with_pr = get_applied_projections_with_pr(&conn).unwrap();
2242        assert_eq!(with_pr.len(), 1);
2243        assert_eq!(with_pr[0].pr_url, Some("https://github.com/test/pull/1".to_string()));
2244    }
2245
2246    #[test]
2247    fn test_get_projected_pattern_ids_by_status() {
2248        let conn = test_db();
2249        let p1 = test_pattern("pat-1", "Pattern one");
2250        let p2 = test_pattern("pat-2", "Pattern two");
2251        insert_pattern(&conn, &p1).unwrap();
2252        insert_pattern(&conn, &p2).unwrap();
2253
2254        let proj1 = Projection {
2255            id: "proj-1".to_string(),
2256            pattern_id: "pat-1".to_string(),
2257            target_type: "skill".to_string(),
2258            target_path: "/a.md".to_string(),
2259            content: "a".to_string(),
2260            applied_at: Utc::now(),
2261            pr_url: None,
2262            status: ProjectionStatus::Applied,
2263        };
2264        let proj2 = Projection {
2265            id: "proj-2".to_string(),
2266            pattern_id: "pat-2".to_string(),
2267            target_type: "skill".to_string(),
2268            target_path: "/b.md".to_string(),
2269            content: "b".to_string(),
2270            applied_at: Utc::now(),
2271            pr_url: None,
2272            status: ProjectionStatus::PendingReview,
2273        };
2274        insert_projection(&conn, &proj1).unwrap();
2275        insert_projection(&conn, &proj2).unwrap();
2276
2277        let ids = get_projected_pattern_ids_by_status(&conn, &[ProjectionStatus::Applied, ProjectionStatus::PendingReview]).unwrap();
2278        assert_eq!(ids.len(), 2);
2279
2280        let ids_applied_only = get_projected_pattern_ids_by_status(&conn, &[ProjectionStatus::Applied]).unwrap();
2281        assert_eq!(ids_applied_only.len(), 1);
2282        assert!(ids_applied_only.contains("pat-1"));
2283    }
2284
2285    #[test]
2286    fn test_has_unprojected_patterns_excludes_dismissed() {
2287        let conn = test_db();
2288
2289        let mut pattern = test_pattern("pat-1", "Dismissed pattern");
2290        pattern.status = PatternStatus::Dismissed;
2291        insert_pattern(&conn, &pattern).unwrap();
2292
2293        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
2294    }
2295
2296    #[test]
2297    fn test_has_unprojected_patterns_excludes_pending_review() {
2298        let conn = test_db();
2299
2300        let pattern = test_pattern("pat-1", "Pattern with pending review");
2301        insert_pattern(&conn, &pattern).unwrap();
2302
2303        // Create a pending_review projection
2304        let proj = Projection {
2305            id: "proj-1".to_string(),
2306            pattern_id: "pat-1".to_string(),
2307            target_type: "skill".to_string(),
2308            target_path: "/test.md".to_string(),
2309            content: "content".to_string(),
2310            applied_at: Utc::now(),
2311            pr_url: None,
2312            status: ProjectionStatus::PendingReview,
2313        };
2314        insert_projection(&conn, &proj).unwrap();
2315
2316        // Pattern already has a pending_review projection — should NOT be "unprojected"
2317        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
2318    }
2319
2320    #[test]
2321    fn test_v4_migration_creates_tables() {
2322        let conn = Connection::open_in_memory().unwrap();
2323        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2324        migrate(&conn).unwrap();
2325
2326        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2327        assert_eq!(version, 6);
2328
2329        // Verify nodes table exists with correct columns
2330        let count: i64 = conn.query_row(
2331            "SELECT COUNT(*) FROM nodes WHERE 1=0", [], |row| row.get(0)
2332        ).unwrap();
2333        assert_eq!(count, 0);
2334
2335        // Verify edges table exists
2336        let count: i64 = conn.query_row(
2337            "SELECT COUNT(*) FROM edges WHERE 1=0", [], |row| row.get(0)
2338        ).unwrap();
2339        assert_eq!(count, 0);
2340
2341        // Verify projects table exists
2342        let count: i64 = conn.query_row(
2343            "SELECT COUNT(*) FROM projects WHERE 1=0", [], |row| row.get(0)
2344        ).unwrap();
2345        assert_eq!(count, 0);
2346    }
2347
2348    // ── Node CRUD tests ──
2349
2350    #[test]
2351    fn test_insert_and_get_node() {
2352        let conn = Connection::open_in_memory().unwrap();
2353        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2354        migrate(&conn).unwrap();
2355
2356        let node = KnowledgeNode {
2357            id: "node-1".to_string(),
2358            node_type: NodeType::Rule,
2359            scope: NodeScope::Project,
2360            project_id: Some("my-app".to_string()),
2361            content: "Always run tests".to_string(),
2362            confidence: 0.85,
2363            status: NodeStatus::Active,
2364            created_at: Utc::now(),
2365            updated_at: Utc::now(),
2366            projected_at: None,
2367            pr_url: None,
2368        };
2369
2370        insert_node(&conn, &node).unwrap();
2371        let retrieved = get_node(&conn, "node-1").unwrap().unwrap();
2372        assert_eq!(retrieved.content, "Always run tests");
2373        assert_eq!(retrieved.node_type, NodeType::Rule);
2374        assert_eq!(retrieved.scope, NodeScope::Project);
2375        assert_eq!(retrieved.confidence, 0.85);
2376    }
2377
2378    #[test]
2379    fn test_get_nodes_by_scope_and_status() {
2380        let conn = Connection::open_in_memory().unwrap();
2381        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2382        migrate(&conn).unwrap();
2383
2384        let now = Utc::now();
2385        for (i, scope) in [NodeScope::Global, NodeScope::Project, NodeScope::Global].iter().enumerate() {
2386            let node = KnowledgeNode {
2387                id: format!("node-{i}"),
2388                node_type: NodeType::Rule,
2389                scope: scope.clone(),
2390                project_id: if *scope == NodeScope::Project { Some("my-app".to_string()) } else { None },
2391                content: format!("Rule {i}"),
2392                confidence: 0.8,
2393                status: NodeStatus::Active,
2394                created_at: now,
2395                updated_at: now,
2396                projected_at: None,
2397                pr_url: None,
2398            };
2399            insert_node(&conn, &node).unwrap();
2400        }
2401
2402        let global_nodes = get_nodes_by_scope(&conn, &NodeScope::Global, None, &[NodeStatus::Active]).unwrap();
2403        assert_eq!(global_nodes.len(), 2);
2404
2405        let project_nodes = get_nodes_by_scope(&conn, &NodeScope::Project, Some("my-app"), &[NodeStatus::Active]).unwrap();
2406        assert_eq!(project_nodes.len(), 1);
2407    }
2408
2409    #[test]
2410    fn test_update_node_confidence() {
2411        let conn = Connection::open_in_memory().unwrap();
2412        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2413        migrate(&conn).unwrap();
2414
2415        let node = KnowledgeNode {
2416            id: "node-1".to_string(),
2417            node_type: NodeType::Pattern,
2418            scope: NodeScope::Project,
2419            project_id: Some("my-app".to_string()),
2420            content: "Forgets tests".to_string(),
2421            confidence: 0.5,
2422            status: NodeStatus::Active,
2423            created_at: Utc::now(),
2424            updated_at: Utc::now(),
2425            projected_at: None,
2426            pr_url: None,
2427        };
2428        insert_node(&conn, &node).unwrap();
2429
2430        update_node_confidence(&conn, "node-1", 0.75).unwrap();
2431        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2432        assert_eq!(updated.confidence, 0.75);
2433    }
2434
2435    #[test]
2436    fn test_update_node_status() {
2437        let conn = Connection::open_in_memory().unwrap();
2438        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2439        migrate(&conn).unwrap();
2440
2441        let node = KnowledgeNode {
2442            id: "node-1".to_string(),
2443            node_type: NodeType::Rule,
2444            scope: NodeScope::Global,
2445            project_id: None,
2446            content: "Use snake_case".to_string(),
2447            confidence: 0.9,
2448            status: NodeStatus::PendingReview,
2449            created_at: Utc::now(),
2450            updated_at: Utc::now(),
2451            projected_at: None,
2452            pr_url: None,
2453        };
2454        insert_node(&conn, &node).unwrap();
2455
2456        update_node_status(&conn, "node-1", &NodeStatus::Active).unwrap();
2457        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2458        assert_eq!(updated.status, NodeStatus::Active);
2459    }
2460
2461    #[test]
2462    fn test_v4_migration_from_v3() {
2463        let conn = Connection::open_in_memory().unwrap();
2464        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2465
2466        // Manually create a v3-state database (v1+v2+v3 tables, user_version=3)
2467        conn.execute_batch("
2468            CREATE TABLE patterns (id TEXT PRIMARY KEY, pattern_type TEXT NOT NULL, description TEXT NOT NULL, confidence REAL NOT NULL, times_seen INTEGER NOT NULL DEFAULT 1, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, last_projected TEXT, status TEXT NOT NULL DEFAULT 'discovered', source_sessions TEXT NOT NULL, related_files TEXT NOT NULL, suggested_content TEXT NOT NULL, suggested_target TEXT NOT NULL, project TEXT, generation_failed INTEGER NOT NULL DEFAULT 0);
2469            CREATE TABLE projections (id TEXT PRIMARY KEY, pattern_id TEXT NOT NULL, target_type TEXT NOT NULL, target_path TEXT NOT NULL, content TEXT NOT NULL, applied_at TEXT NOT NULL, pr_url TEXT, nudged INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'applied');
2470            CREATE TABLE analyzed_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, analyzed_at TEXT NOT NULL);
2471            CREATE TABLE ingested_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, session_path TEXT NOT NULL, file_size INTEGER NOT NULL, file_mtime TEXT NOT NULL, ingested_at TEXT NOT NULL);
2472            CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT NOT NULL);
2473        ").unwrap();
2474        conn.pragma_update(None, "user_version", 3).unwrap();
2475
2476        // Now run migrate — should only add v4 tables
2477        migrate(&conn).unwrap();
2478
2479        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2480        assert_eq!(version, 6);
2481
2482        // v4 tables exist
2483        conn.query_row("SELECT COUNT(*) FROM nodes WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2484        conn.query_row("SELECT COUNT(*) FROM edges WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2485        conn.query_row("SELECT COUNT(*) FROM projects WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2486
2487        // Old tables still exist
2488        conn.query_row("SELECT COUNT(*) FROM patterns WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2489    }
2490
2491    // ── Edge CRUD tests ──
2492
2493    #[test]
2494    fn test_insert_and_get_edges() {
2495        let conn = Connection::open_in_memory().unwrap();
2496        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2497        migrate(&conn).unwrap();
2498
2499        let now = Utc::now();
2500        let node1 = KnowledgeNode {
2501            id: "node-1".to_string(), node_type: NodeType::Pattern,
2502            scope: NodeScope::Project, project_id: Some("app".to_string()),
2503            content: "Pattern A".to_string(), confidence: 0.5,
2504            status: NodeStatus::Active, created_at: now, updated_at: now,
2505            projected_at: None, pr_url: None,
2506        };
2507        let node2 = KnowledgeNode {
2508            id: "node-2".to_string(), node_type: NodeType::Rule,
2509            scope: NodeScope::Project, project_id: Some("app".to_string()),
2510            content: "Rule B".to_string(), confidence: 0.8,
2511            status: NodeStatus::Active, created_at: now, updated_at: now,
2512            projected_at: None, pr_url: None,
2513        };
2514        insert_node(&conn, &node1).unwrap();
2515        insert_node(&conn, &node2).unwrap();
2516
2517        let edge = KnowledgeEdge {
2518            source_id: "node-1".to_string(),
2519            target_id: "node-2".to_string(),
2520            edge_type: EdgeType::DerivedFrom,
2521            created_at: now,
2522        };
2523        insert_edge(&conn, &edge).unwrap();
2524
2525        let edges = get_edges_from(&conn, "node-1").unwrap();
2526        assert_eq!(edges.len(), 1);
2527        assert_eq!(edges[0].target_id, "node-2");
2528        assert_eq!(edges[0].edge_type, EdgeType::DerivedFrom);
2529
2530        let edges_to = get_edges_to(&conn, "node-2").unwrap();
2531        assert_eq!(edges_to.len(), 1);
2532        assert_eq!(edges_to[0].source_id, "node-1");
2533    }
2534
2535    #[test]
2536    fn test_supersede_node_archives_old() {
2537        let conn = Connection::open_in_memory().unwrap();
2538        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2539        migrate(&conn).unwrap();
2540
2541        let now = Utc::now();
2542        let old_node = KnowledgeNode {
2543            id: "old".to_string(), node_type: NodeType::Rule,
2544            scope: NodeScope::Global, project_id: None,
2545            content: "Old rule".to_string(), confidence: 0.8,
2546            status: NodeStatus::Active, created_at: now, updated_at: now,
2547            projected_at: None, pr_url: None,
2548        };
2549        let new_node = KnowledgeNode {
2550            id: "new".to_string(), node_type: NodeType::Rule,
2551            scope: NodeScope::Global, project_id: None,
2552            content: "New rule".to_string(), confidence: 0.85,
2553            status: NodeStatus::Active, created_at: now, updated_at: now,
2554            projected_at: None, pr_url: None,
2555        };
2556        insert_node(&conn, &old_node).unwrap();
2557        insert_node(&conn, &new_node).unwrap();
2558
2559        supersede_node(&conn, "new", "old").unwrap();
2560
2561        let old = get_node(&conn, "old").unwrap().unwrap();
2562        assert_eq!(old.status, NodeStatus::Archived);
2563
2564        let edges = get_edges_from(&conn, "new").unwrap();
2565        assert_eq!(edges.len(), 1);
2566        assert_eq!(edges[0].edge_type, EdgeType::Supersedes);
2567        assert_eq!(edges[0].target_id, "old");
2568    }
2569
2570    // ── Project CRUD tests ──
2571
2572    #[test]
2573    fn test_upsert_and_get_project() {
2574        let conn = Connection::open_in_memory().unwrap();
2575        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2576        migrate(&conn).unwrap();
2577
2578        let project = KnowledgeProject {
2579            id: "my-app".to_string(),
2580            path: "/home/user/my-app".to_string(),
2581            remote_url: Some("git@github.com:user/my-app.git".to_string()),
2582            agent_type: "claude_code".to_string(),
2583            last_seen: Utc::now(),
2584        };
2585        upsert_project(&conn, &project).unwrap();
2586
2587        let retrieved = get_project(&conn, "my-app").unwrap().unwrap();
2588        assert_eq!(retrieved.path, "/home/user/my-app");
2589        assert_eq!(retrieved.remote_url.unwrap(), "git@github.com:user/my-app.git");
2590    }
2591
2592    #[test]
2593    fn test_get_project_by_remote_url() {
2594        let conn = Connection::open_in_memory().unwrap();
2595        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2596        migrate(&conn).unwrap();
2597
2598        let project = KnowledgeProject {
2599            id: "my-app".to_string(),
2600            path: "/old/path".to_string(),
2601            remote_url: Some("git@github.com:user/my-app.git".to_string()),
2602            agent_type: "claude_code".to_string(),
2603            last_seen: Utc::now(),
2604        };
2605        upsert_project(&conn, &project).unwrap();
2606
2607        let found = get_project_by_remote_url(&conn, "git@github.com:user/my-app.git").unwrap();
2608        assert!(found.is_some());
2609        assert_eq!(found.unwrap().id, "my-app");
2610    }
2611
2612    #[test]
2613    fn test_get_all_projects() {
2614        let conn = Connection::open_in_memory().unwrap();
2615        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2616        migrate(&conn).unwrap();
2617
2618        for name in ["app-1", "app-2"] {
2619            let project = KnowledgeProject {
2620                id: name.to_string(),
2621                path: format!("/home/{name}"),
2622                remote_url: None,
2623                agent_type: "claude_code".to_string(),
2624                last_seen: Utc::now(),
2625            };
2626            upsert_project(&conn, &project).unwrap();
2627        }
2628
2629        let projects = get_all_projects(&conn).unwrap();
2630        assert_eq!(projects.len(), 2);
2631    }
2632
2633    #[test]
2634    fn test_generate_project_slug() {
2635        assert_eq!(generate_project_slug("/home/user/my-rust-app"), "my-rust-app");
2636        assert_eq!(generate_project_slug("/home/user/My App"), "my-app");
2637        assert_eq!(generate_project_slug("/"), "unnamed-project");
2638    }
2639
2640    #[test]
2641    fn test_migrate_patterns_to_nodes() {
2642        let conn = Connection::open_in_memory().unwrap();
2643        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2644        migrate(&conn).unwrap();
2645
2646        // Insert v1 patterns
2647        let now = Utc::now().to_rfc3339();
2648        conn.execute(
2649            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2650             VALUES (?1, ?2, ?3, ?4, 2, ?5, ?5, 'active', '[]', '[]', 'content', ?6, ?7, 0)",
2651            params!["p1", "repetitive_instruction", "Always run tests", 0.85, &now, "claude_md", "my-app"],
2652        ).unwrap();
2653        conn.execute(
2654            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2655             VALUES (?1, ?2, ?3, ?4, 1, ?5, ?5, 'discovered', '[]', '[]', 'content', ?6, ?7, 0)",
2656            params!["p2", "recurring_mistake", "Forgets imports", 0.6, &now, "skill", "my-app"],
2657        ).unwrap();
2658        conn.execute(
2659            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2660             VALUES (?1, ?2, ?3, ?4, 3, ?5, ?5, 'active', '[]', '[]', 'Always use snake_case', ?6, ?7, 0)",
2661            params!["p3", "repetitive_instruction", "Always use snake_case", 0.9, &now, "claude_md", "my-app"],
2662        ).unwrap();
2663
2664        let count = migrate_patterns_to_nodes(&conn).unwrap();
2665        assert_eq!(count, 3);
2666
2667        // p1: RepetitiveInstruction + ClaudeMd -> rule
2668        let node1 = get_node(&conn, "migrated-p1").unwrap().unwrap();
2669        assert_eq!(node1.node_type, NodeType::Rule);
2670        assert_eq!(node1.scope, NodeScope::Project);
2671
2672        // p2: RecurringMistake -> pattern
2673        let node2 = get_node(&conn, "migrated-p2").unwrap().unwrap();
2674        assert_eq!(node2.node_type, NodeType::Pattern);
2675
2676        // p3: confidence >= 0.85 + "always" in content -> directive (override)
2677        let node3 = get_node(&conn, "migrated-p3").unwrap().unwrap();
2678        assert_eq!(node3.node_type, NodeType::Directive);
2679    }
2680
2681    #[test]
2682    fn test_apply_graph_operations() {
2683        let conn = Connection::open_in_memory().unwrap();
2684        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2685        migrate(&conn).unwrap();
2686
2687        let ops = vec![
2688            GraphOperation::CreateNode {
2689                node_type: NodeType::Rule,
2690                scope: NodeScope::Project,
2691                project_id: Some("my-app".to_string()),
2692                content: "Always run tests".to_string(),
2693                confidence: 0.85,
2694            },
2695            GraphOperation::CreateNode {
2696                node_type: NodeType::Pattern,
2697                scope: NodeScope::Global,
2698                project_id: None,
2699                content: "Prefers TDD".to_string(),
2700                confidence: 0.6,
2701            },
2702        ];
2703
2704        let result = apply_graph_operations(&conn, &ops).unwrap();
2705        assert_eq!(result.nodes_created, 2);
2706
2707        let nodes = get_nodes_by_scope(&conn, &NodeScope::Project, Some("my-app"), &[NodeStatus::Active]).unwrap();
2708        assert_eq!(nodes.len(), 1);
2709        assert_eq!(nodes[0].content, "Always run tests");
2710    }
2711
2712    #[test]
2713    fn test_apply_graph_operations_update() {
2714        let conn = Connection::open_in_memory().unwrap();
2715        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2716        migrate(&conn).unwrap();
2717
2718        let node = KnowledgeNode {
2719            id: "node-1".to_string(),
2720            node_type: NodeType::Pattern,
2721            scope: NodeScope::Project,
2722            project_id: Some("app".to_string()),
2723            content: "Old content".to_string(),
2724            confidence: 0.5,
2725            status: NodeStatus::Active,
2726            created_at: Utc::now(),
2727            updated_at: Utc::now(),
2728            projected_at: None,
2729            pr_url: None,
2730        };
2731        insert_node(&conn, &node).unwrap();
2732
2733        let ops = vec![
2734            GraphOperation::UpdateNode {
2735                id: "node-1".to_string(),
2736                confidence: Some(0.8),
2737                content: Some("Updated content".to_string()),
2738            },
2739        ];
2740
2741        let result = apply_graph_operations(&conn, &ops).unwrap();
2742        assert_eq!(result.nodes_updated, 1);
2743
2744        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2745        assert_eq!(updated.confidence, 0.8);
2746        assert_eq!(updated.content, "Updated content");
2747    }
2748
2749    #[test]
2750    fn test_get_nodes_by_status() {
2751        let conn = test_db();
2752
2753        let active_node = KnowledgeNode {
2754            id: "n1".to_string(),
2755            node_type: NodeType::Rule,
2756            scope: NodeScope::Project,
2757            project_id: Some("my-app".to_string()),
2758            content: "Always run tests".to_string(),
2759            confidence: 0.85,
2760            status: NodeStatus::Active,
2761            created_at: Utc::now(),
2762            updated_at: Utc::now(),
2763            projected_at: None,
2764            pr_url: None,
2765        };
2766        let pending_node = KnowledgeNode {
2767            id: "n2".to_string(),
2768            node_type: NodeType::Directive,
2769            scope: NodeScope::Global,
2770            project_id: None,
2771            content: "Use snake_case".to_string(),
2772            confidence: 0.9,
2773            status: NodeStatus::PendingReview,
2774            created_at: Utc::now(),
2775            updated_at: Utc::now(),
2776            projected_at: None,
2777            pr_url: None,
2778        };
2779        let dismissed_node = KnowledgeNode {
2780            id: "n3".to_string(),
2781            node_type: NodeType::Pattern,
2782            scope: NodeScope::Project,
2783            project_id: Some("my-app".to_string()),
2784            content: "Old pattern".to_string(),
2785            confidence: 0.5,
2786            status: NodeStatus::Dismissed,
2787            created_at: Utc::now(),
2788            updated_at: Utc::now(),
2789            projected_at: None,
2790            pr_url: None,
2791        };
2792
2793        insert_node(&conn, &active_node).unwrap();
2794        insert_node(&conn, &pending_node).unwrap();
2795        insert_node(&conn, &dismissed_node).unwrap();
2796
2797        let pending = get_nodes_by_status(&conn, &NodeStatus::PendingReview).unwrap();
2798        assert_eq!(pending.len(), 1);
2799        assert_eq!(pending[0].id, "n2");
2800
2801        let active = get_nodes_by_status(&conn, &NodeStatus::Active).unwrap();
2802        assert_eq!(active.len(), 1);
2803        assert_eq!(active[0].id, "n1");
2804
2805        let pending2 = KnowledgeNode {
2806            id: "n4".to_string(),
2807            node_type: NodeType::Rule,
2808            scope: NodeScope::Project,
2809            project_id: Some("other".to_string()),
2810            content: "Second pending".to_string(),
2811            confidence: 0.95,
2812            status: NodeStatus::PendingReview,
2813            created_at: Utc::now(),
2814            updated_at: Utc::now(),
2815            projected_at: None,
2816            pr_url: None,
2817        };
2818        insert_node(&conn, &pending2).unwrap();
2819        let pending_all = get_nodes_by_status(&conn, &NodeStatus::PendingReview).unwrap();
2820        assert_eq!(pending_all.len(), 2);
2821        assert_eq!(pending_all[0].id, "n4"); // Higher confidence first
2822    }
2823
2824    #[test]
2825    fn test_migrate_v4_to_v5_adds_projection_columns() {
2826        let conn = Connection::open_in_memory().unwrap();
2827        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2828        migrate(&conn).unwrap();
2829
2830        // Insert a node — should support new columns
2831        let node = KnowledgeNode {
2832            id: "test-1".to_string(),
2833            node_type: NodeType::Rule,
2834            scope: NodeScope::Global,
2835            project_id: None,
2836            content: "test rule".to_string(),
2837            confidence: 0.8,
2838            status: NodeStatus::Active,
2839            created_at: Utc::now(),
2840            updated_at: Utc::now(),
2841            projected_at: None,
2842            pr_url: None,
2843        };
2844        insert_node(&conn, &node).unwrap();
2845
2846        let retrieved = get_node(&conn, "test-1").unwrap().unwrap();
2847        assert!(retrieved.projected_at.is_none());
2848        assert!(retrieved.pr_url.is_none());
2849
2850        // Verify schema version
2851        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2852        assert_eq!(version, 6);
2853    }
2854
2855    fn test_node(id: &str, status: NodeStatus, projected_at: Option<String>, pr_url: Option<String>) -> KnowledgeNode {
2856        KnowledgeNode {
2857            id: id.to_string(),
2858            node_type: NodeType::Rule,
2859            scope: NodeScope::Global,
2860            project_id: None,
2861            content: format!("Content for {}", id),
2862            confidence: 0.8,
2863            status,
2864            created_at: Utc::now(),
2865            updated_at: Utc::now(),
2866            projected_at,
2867            pr_url,
2868        }
2869    }
2870
2871    #[test]
2872    fn test_get_unprojected_nodes() {
2873        let conn = test_db();
2874
2875        // Active node with no projected_at — should be returned
2876        let active_unprojected = test_node("n1", NodeStatus::Active, None, None);
2877        // Active node with projected_at set — should NOT be returned
2878        let active_projected = test_node("n2", NodeStatus::Active, Some(Utc::now().to_rfc3339()), None);
2879        // PendingReview node with no projected_at — should NOT be returned (wrong status)
2880        let pending = test_node("n3", NodeStatus::PendingReview, None, None);
2881
2882        insert_node(&conn, &active_unprojected).unwrap();
2883        insert_node(&conn, &active_projected).unwrap();
2884        insert_node(&conn, &pending).unwrap();
2885
2886        let nodes = get_unprojected_nodes(&conn).unwrap();
2887        assert_eq!(nodes.len(), 1);
2888        assert_eq!(nodes[0].id, "n1");
2889    }
2890
2891    #[test]
2892    fn test_mark_node_projected() {
2893        let conn = test_db();
2894
2895        let node = test_node("n1", NodeStatus::Active, None, None);
2896        insert_node(&conn, &node).unwrap();
2897
2898        mark_node_projected(&conn, "n1").unwrap();
2899
2900        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2901        assert!(retrieved.projected_at.is_some());
2902        assert!(retrieved.pr_url.is_none());
2903    }
2904
2905    #[test]
2906    fn test_mark_node_projected_with_pr() {
2907        let conn = test_db();
2908
2909        let node = test_node("n1", NodeStatus::Active, None, None);
2910        insert_node(&conn, &node).unwrap();
2911
2912        mark_node_projected_with_pr(&conn, "n1", "https://github.com/test/pull/42").unwrap();
2913
2914        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2915        assert!(retrieved.projected_at.is_some());
2916        assert_eq!(retrieved.pr_url, Some("https://github.com/test/pull/42".to_string()));
2917    }
2918
2919    #[test]
2920    fn test_dismiss_nodes_for_pr() {
2921        let conn = test_db();
2922
2923        let pr_url = "https://github.com/test/pull/99";
2924        let node1 = test_node("n1", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2925        let node2 = test_node("n2", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2926
2927        insert_node(&conn, &node1).unwrap();
2928        insert_node(&conn, &node2).unwrap();
2929
2930        dismiss_nodes_for_pr(&conn, pr_url).unwrap();
2931
2932        let n1 = get_node(&conn, "n1").unwrap().unwrap();
2933        let n2 = get_node(&conn, "n2").unwrap().unwrap();
2934
2935        assert_eq!(n1.status, NodeStatus::Dismissed);
2936        assert!(n1.pr_url.is_none());
2937        assert_eq!(n2.status, NodeStatus::Dismissed);
2938        assert!(n2.pr_url.is_none());
2939    }
2940
2941    #[test]
2942    fn test_clear_node_pr() {
2943        let conn = test_db();
2944
2945        let pr_url = "https://github.com/test/pull/7";
2946        let node = test_node("n1", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2947        insert_node(&conn, &node).unwrap();
2948
2949        clear_node_pr(&conn, pr_url).unwrap();
2950
2951        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2952        assert!(retrieved.pr_url.is_none());
2953        assert_eq!(retrieved.status, NodeStatus::Active);
2954    }
2955}