Skip to main content

retro_core/
db.rs

1use crate::errors::CoreError;
2use crate::models::{
3    IngestedSession, KnowledgeEdge, KnowledgeNode, KnowledgeProject,
4    EdgeType, GraphOperation, NodeScope, NodeStatus, NodeType,
5    Pattern, PatternStatus, PatternType, Projection, ProjectionStatus, SuggestedTarget,
6};
7use chrono::{DateTime, Utc};
8pub use rusqlite::Connection;
9use rusqlite::params;
10use rusqlite::OptionalExtension;
11use std::path::Path;
12
13const SCHEMA_VERSION: u32 = 5;
14
15/// Open (or create) the retro database with WAL mode enabled.
16pub fn open_db(path: &Path) -> Result<Connection, CoreError> {
17    let conn = Connection::open(path)?;
18
19    // Enable WAL mode for concurrent access
20    conn.pragma_update(None, "journal_mode", "WAL")?;
21
22    // Run migrations
23    migrate(&conn)?;
24
25    Ok(conn)
26}
27
28/// Initialize schema on an existing connection (for testing with in-memory DBs).
29pub fn init_db(conn: &Connection) -> Result<(), CoreError> {
30    migrate(conn)
31}
32
33fn migrate(conn: &Connection) -> Result<(), CoreError> {
34    let current_version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0))?;
35
36    if current_version < 1 {
37        conn.execute_batch(
38            "
39            CREATE TABLE IF NOT EXISTS patterns (
40                id TEXT PRIMARY KEY,
41                pattern_type TEXT NOT NULL,
42                description TEXT NOT NULL,
43                confidence REAL NOT NULL,
44                times_seen INTEGER NOT NULL DEFAULT 1,
45                first_seen TEXT NOT NULL,
46                last_seen TEXT NOT NULL,
47                last_projected TEXT,
48                status TEXT NOT NULL DEFAULT 'discovered',
49                source_sessions TEXT NOT NULL,
50                related_files TEXT NOT NULL,
51                suggested_content TEXT NOT NULL,
52                suggested_target TEXT NOT NULL,
53                project TEXT,
54                generation_failed INTEGER NOT NULL DEFAULT 0
55            );
56
57            CREATE TABLE IF NOT EXISTS projections (
58                id TEXT PRIMARY KEY,
59                pattern_id TEXT NOT NULL REFERENCES patterns(id),
60                target_type TEXT NOT NULL,
61                target_path TEXT NOT NULL,
62                content TEXT NOT NULL,
63                applied_at TEXT NOT NULL,
64                pr_url TEXT,
65                nudged INTEGER NOT NULL DEFAULT 0
66            );
67
68            CREATE TABLE IF NOT EXISTS analyzed_sessions (
69                session_id TEXT PRIMARY KEY,
70                project TEXT NOT NULL,
71                analyzed_at TEXT NOT NULL
72            );
73
74            CREATE TABLE IF NOT EXISTS ingested_sessions (
75                session_id TEXT PRIMARY KEY,
76                project TEXT NOT NULL,
77                session_path TEXT NOT NULL,
78                file_size INTEGER NOT NULL,
79                file_mtime TEXT NOT NULL,
80                ingested_at TEXT NOT NULL
81            );
82
83            CREATE INDEX IF NOT EXISTS idx_patterns_status ON patterns(status);
84            CREATE INDEX IF NOT EXISTS idx_patterns_type ON patterns(pattern_type);
85            CREATE INDEX IF NOT EXISTS idx_patterns_target ON patterns(suggested_target);
86            CREATE INDEX IF NOT EXISTS idx_patterns_project ON patterns(project);
87            CREATE INDEX IF NOT EXISTS idx_projections_pattern ON projections(pattern_id);
88            ",
89        )?;
90
91        conn.pragma_update(None, "user_version", 1)?;
92    }
93
94    if current_version < 2 {
95        conn.execute_batch(
96            "
97            CREATE TABLE IF NOT EXISTS metadata (
98                key TEXT PRIMARY KEY,
99                value TEXT NOT NULL
100            );
101            ",
102        )?;
103        conn.pragma_update(None, "user_version", 2)?;
104    }
105
106    if current_version < 3 {
107        conn.execute_batch(
108            "ALTER TABLE projections ADD COLUMN status TEXT NOT NULL DEFAULT 'applied';",
109        )?;
110        conn.pragma_update(None, "user_version", 3)?;
111    }
112
113    if current_version < 4 {
114        conn.execute_batch(
115            "
116            CREATE TABLE IF NOT EXISTS nodes (
117                id TEXT PRIMARY KEY,
118                type TEXT NOT NULL,
119                scope TEXT NOT NULL,
120                project_id TEXT,
121                content TEXT NOT NULL,
122                confidence REAL NOT NULL,
123                status TEXT NOT NULL,
124                created_at TEXT NOT NULL,
125                updated_at TEXT NOT NULL,
126                projected_at TEXT,
127                pr_url TEXT
128            );
129
130            CREATE INDEX IF NOT EXISTS idx_nodes_scope_project ON nodes(scope, project_id, status);
131            CREATE INDEX IF NOT EXISTS idx_nodes_type_status ON nodes(type, status);
132
133            CREATE TABLE IF NOT EXISTS edges (
134                source_id TEXT NOT NULL,
135                target_id TEXT NOT NULL,
136                type TEXT NOT NULL,
137                created_at TEXT NOT NULL,
138                PRIMARY KEY (source_id, target_id, type)
139            );
140
141            CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target_id, type);
142            CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source_id, type);
143
144            CREATE TABLE IF NOT EXISTS projects (
145                id TEXT PRIMARY KEY,
146                path TEXT NOT NULL,
147                remote_url TEXT,
148                agent_type TEXT NOT NULL DEFAULT 'claude_code',
149                last_seen TEXT NOT NULL
150            );
151            ",
152        )?;
153
154        // Migrate existing v1 patterns to v2 nodes
155        migrate_patterns_to_nodes(conn)?;
156
157        conn.pragma_update(None, "user_version", 4)?;
158    }
159
160    if current_version < 5 {
161        // For databases upgraded from v4, we need to add the new columns.
162        // For fresh installs, v4 CREATE TABLE already includes them.
163        let has_projected_at: bool = conn
164            .prepare("SELECT projected_at FROM nodes LIMIT 0")
165            .is_ok();
166        if !has_projected_at {
167            conn.execute_batch(
168                "ALTER TABLE nodes ADD COLUMN projected_at TEXT;
169                 ALTER TABLE nodes ADD COLUMN pr_url TEXT;"
170            )?;
171        }
172        conn.pragma_update(None, "user_version", 5)?;
173    }
174
175    Ok(())
176}
177
178/// Check if a session has already been ingested and is up-to-date.
179pub fn is_session_ingested(
180    conn: &Connection,
181    session_id: &str,
182    file_size: u64,
183    file_mtime: &str,
184) -> Result<bool, CoreError> {
185    let mut stmt = conn.prepare(
186        "SELECT file_size, file_mtime FROM ingested_sessions WHERE session_id = ?1",
187    )?;
188
189    let result = stmt.query_row(params![session_id], |row| {
190        let size: u64 = row.get(0)?;
191        let mtime: String = row.get(1)?;
192        Ok((size, mtime))
193    });
194
195    match result {
196        Ok((size, mtime)) => Ok(size == file_size && mtime == file_mtime),
197        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
198        Err(e) => Err(CoreError::Database(e.to_string())),
199    }
200}
201
202/// Record a session as ingested.
203pub fn record_ingested_session(
204    conn: &Connection,
205    session: &IngestedSession,
206) -> Result<(), CoreError> {
207    conn.execute(
208        "INSERT OR REPLACE INTO ingested_sessions (session_id, project, session_path, file_size, file_mtime, ingested_at)
209         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
210        params![
211            session.session_id,
212            session.project,
213            session.session_path,
214            session.file_size,
215            session.file_mtime,
216            session.ingested_at.to_rfc3339(),
217        ],
218    )?;
219    Ok(())
220}
221
222/// Get the count of ingested sessions.
223pub fn ingested_session_count(conn: &Connection) -> Result<u64, CoreError> {
224    let count: u64 =
225        conn.query_row("SELECT COUNT(*) FROM ingested_sessions", [], |row| {
226            row.get(0)
227        })?;
228    Ok(count)
229}
230
231/// Get the count of ingested sessions for a specific project.
232pub fn ingested_session_count_for_project(
233    conn: &Connection,
234    project: &str,
235) -> Result<u64, CoreError> {
236    let count: u64 = conn.query_row(
237        "SELECT COUNT(*) FROM ingested_sessions WHERE project = ?1",
238        params![project],
239        |row| row.get(0),
240    )?;
241    Ok(count)
242}
243
244/// Get the count of analyzed sessions.
245pub fn analyzed_session_count(conn: &Connection) -> Result<u64, CoreError> {
246    let count: u64 =
247        conn.query_row("SELECT COUNT(*) FROM analyzed_sessions", [], |row| {
248            row.get(0)
249        })?;
250    Ok(count)
251}
252
253/// Get the count of patterns by status.
254pub fn pattern_count_by_status(conn: &Connection, status: &str) -> Result<u64, CoreError> {
255    let count: u64 = conn.query_row(
256        "SELECT COUNT(*) FROM patterns WHERE status = ?1",
257        params![status],
258        |row| row.get(0),
259    )?;
260    Ok(count)
261}
262
263/// Get the most recent ingestion timestamp.
264pub fn last_ingested_at(conn: &Connection) -> Result<Option<String>, CoreError> {
265    let result = conn.query_row(
266        "SELECT MAX(ingested_at) FROM ingested_sessions",
267        [],
268        |row| row.get::<_, Option<String>>(0),
269    )?;
270    Ok(result)
271}
272
273/// Get the most recent analysis timestamp.
274pub fn last_analyzed_at(conn: &Connection) -> Result<Option<String>, CoreError> {
275    let result = conn.query_row(
276        "SELECT MAX(analyzed_at) FROM analyzed_sessions",
277        [],
278        |row| row.get::<_, Option<String>>(0),
279    )?;
280    Ok(result)
281}
282
283/// Get the most recent projection (apply) timestamp.
284pub fn last_applied_at(conn: &Connection) -> Result<Option<String>, CoreError> {
285    let result = conn.query_row(
286        "SELECT MAX(applied_at) FROM projections",
287        [],
288        |row| row.get::<_, Option<String>>(0),
289    )?;
290    Ok(result)
291}
292
293/// Check if there are ingested sessions that haven't been analyzed yet.
294pub fn has_unanalyzed_sessions(conn: &Connection) -> Result<bool, CoreError> {
295    let count: u64 = conn.query_row(
296        "SELECT COUNT(*) FROM ingested_sessions i
297         LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
298         WHERE a.session_id IS NULL",
299        [],
300        |row| row.get(0),
301    )?;
302    Ok(count > 0)
303}
304
305/// Count ingested sessions that haven't been analyzed yet.
306pub fn unanalyzed_session_count(conn: &Connection) -> Result<u64, CoreError> {
307    let count: u64 = conn.query_row(
308        "SELECT COUNT(*) FROM ingested_sessions i
309         LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
310         WHERE a.session_id IS NULL",
311        [],
312        |row| row.get(0),
313    )?;
314    Ok(count)
315}
316
317/// Check if there are patterns eligible for projection that haven't been projected yet.
318/// Mirrors the gating logic in `get_qualifying_patterns()`: excludes patterns with
319/// generation_failed=true, suggested_target='db_only', or confidence below threshold.
320/// The confidence threshold is the sole quality gate (no times_seen requirement).
321pub fn has_unprojected_patterns(conn: &Connection, confidence_threshold: f64) -> Result<bool, CoreError> {
322    let count: u64 = conn.query_row(
323        "SELECT COUNT(*) FROM patterns p
324         LEFT JOIN projections pr ON p.id = pr.pattern_id
325         WHERE pr.id IS NULL
326         AND p.status IN ('discovered', 'active')
327         AND p.generation_failed = 0
328         AND p.suggested_target != 'db_only'
329         AND p.confidence >= ?1",
330        [confidence_threshold],
331        |row| row.get(0),
332    )?;
333    Ok(count > 0)
334}
335
336/// Get the last nudge timestamp from metadata.
337pub fn get_last_nudge_at(conn: &Connection) -> Result<Option<DateTime<Utc>>, CoreError> {
338    let result: Option<String> = conn
339        .query_row(
340            "SELECT value FROM metadata WHERE key = 'last_nudge_at'",
341            [],
342            |row| row.get(0),
343        )
344        .optional()?;
345
346    match result {
347        Some(s) => match DateTime::parse_from_rfc3339(&s) {
348            Ok(dt) => Ok(Some(dt.with_timezone(&Utc))),
349            Err(_) => Ok(None),
350        },
351        None => Ok(None),
352    }
353}
354
355/// Set the last nudge timestamp in metadata.
356pub fn set_last_nudge_at(conn: &Connection, timestamp: &DateTime<Utc>) -> Result<(), CoreError> {
357    conn.execute(
358        "INSERT OR REPLACE INTO metadata (key, value) VALUES ('last_nudge_at', ?1)",
359        params![timestamp.to_rfc3339()],
360    )?;
361    Ok(())
362}
363
364/// Get a value from the metadata table by key.
365pub fn get_metadata(conn: &Connection, key: &str) -> Result<Option<String>, CoreError> {
366    let result: Option<String> = conn
367        .query_row(
368            "SELECT value FROM metadata WHERE key = ?1",
369            params![key],
370            |row| row.get(0),
371        )
372        .optional()?;
373    Ok(result)
374}
375
376/// Set a value in the metadata table (insert or replace).
377pub fn set_metadata(conn: &Connection, key: &str, value: &str) -> Result<(), CoreError> {
378    conn.execute(
379        "INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)",
380        params![key, value],
381    )?;
382    Ok(())
383}
384
385/// Verify the database is using WAL mode.
386pub fn verify_wal_mode(conn: &Connection) -> Result<bool, CoreError> {
387    let mode: String = conn.pragma_query_value(None, "journal_mode", |row| row.get(0))?;
388    Ok(mode.to_lowercase() == "wal")
389}
390
391/// Get all distinct projects from ingested sessions.
392pub fn list_projects(conn: &Connection) -> Result<Vec<String>, CoreError> {
393    let mut stmt =
394        conn.prepare("SELECT DISTINCT project FROM ingested_sessions ORDER BY project")?;
395    let projects = stmt
396        .query_map([], |row| row.get(0))?
397        .filter_map(|r| r.ok())
398        .collect();
399    Ok(projects)
400}
401
402// ── Pattern operations ──
403
404const PATTERN_COLUMNS: &str = "id, pattern_type, description, confidence, times_seen, first_seen, last_seen, last_projected, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed";
405
406/// Insert a new pattern into the database.
407pub fn insert_pattern(conn: &Connection, pattern: &Pattern) -> Result<(), CoreError> {
408    let source_sessions =
409        serde_json::to_string(&pattern.source_sessions).unwrap_or_else(|_| "[]".to_string());
410    let related_files =
411        serde_json::to_string(&pattern.related_files).unwrap_or_else(|_| "[]".to_string());
412
413    conn.execute(
414        "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, last_projected, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
415         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
416        params![
417            pattern.id,
418            pattern.pattern_type.to_string(),
419            pattern.description,
420            pattern.confidence,
421            pattern.times_seen,
422            pattern.first_seen.to_rfc3339(),
423            pattern.last_seen.to_rfc3339(),
424            pattern.last_projected.map(|t| t.to_rfc3339()),
425            pattern.status.to_string(),
426            source_sessions,
427            related_files,
428            pattern.suggested_content,
429            pattern.suggested_target.to_string(),
430            pattern.project,
431            pattern.generation_failed as i32,
432        ],
433    )?;
434    Ok(())
435}
436
437/// Update an existing pattern with new evidence (merge).
438pub fn update_pattern_merge(
439    conn: &Connection,
440    id: &str,
441    new_sessions: &[String],
442    new_confidence: f64,
443    new_last_seen: DateTime<Utc>,
444    additional_times_seen: i64,
445) -> Result<(), CoreError> {
446    // Load existing source_sessions and merge
447    let existing_sessions: String = conn.query_row(
448        "SELECT source_sessions FROM patterns WHERE id = ?1",
449        params![id],
450        |row| row.get(0),
451    )?;
452
453    let mut sessions: Vec<String> =
454        serde_json::from_str(&existing_sessions).unwrap_or_default();
455    for s in new_sessions {
456        if !sessions.contains(s) {
457            sessions.push(s.clone());
458        }
459    }
460    let merged_sessions = serde_json::to_string(&sessions).unwrap_or_else(|_| "[]".to_string());
461
462    conn.execute(
463        "UPDATE patterns SET
464            confidence = MAX(confidence, ?2),
465            times_seen = times_seen + ?3,
466            last_seen = ?4,
467            source_sessions = ?5
468         WHERE id = ?1",
469        params![
470            id,
471            new_confidence,
472            additional_times_seen,
473            new_last_seen.to_rfc3339(),
474            merged_sessions,
475        ],
476    )?;
477    Ok(())
478}
479
480/// Get patterns filtered by status and optionally by project.
481pub fn get_patterns(
482    conn: &Connection,
483    statuses: &[&str],
484    project: Option<&str>,
485) -> Result<Vec<Pattern>, CoreError> {
486    if statuses.is_empty() {
487        return Ok(Vec::new());
488    }
489
490    let placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 1)).collect();
491    let status_clause = placeholders.join(", ");
492
493    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match project {
494        Some(proj) => {
495            let q = format!(
496                "SELECT {PATTERN_COLUMNS}
497                 FROM patterns WHERE status IN ({}) AND (project = ?{} OR project IS NULL)
498                 ORDER BY confidence DESC",
499                status_clause,
500                statuses.len() + 1
501            );
502            let mut p: Vec<Box<dyn rusqlite::types::ToSql>> = statuses.iter().map(|s| Box::new(s.to_string()) as Box<dyn rusqlite::types::ToSql>).collect();
503            p.push(Box::new(proj.to_string()));
504            (q, p)
505        }
506        None => {
507            let q = format!(
508                "SELECT {PATTERN_COLUMNS}
509                 FROM patterns WHERE status IN ({})
510                 ORDER BY confidence DESC",
511                status_clause
512            );
513            let p: Vec<Box<dyn rusqlite::types::ToSql>> = statuses.iter().map(|s| Box::new(s.to_string()) as Box<dyn rusqlite::types::ToSql>).collect();
514            (q, p)
515        }
516    };
517
518    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
519    let mut stmt = conn.prepare(&query)?;
520    let patterns = stmt
521        .query_map(params_refs.as_slice(), |row| {
522            Ok(read_pattern_row(row))
523        })?
524        .filter_map(|r| r.ok())
525        .collect();
526
527    Ok(patterns)
528}
529
530/// Get all patterns, optionally filtered by project.
531pub fn get_all_patterns(conn: &Connection, project: Option<&str>) -> Result<Vec<Pattern>, CoreError> {
532    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match project {
533        Some(proj) => {
534            let q = format!(
535                "SELECT {PATTERN_COLUMNS}
536                 FROM patterns WHERE project = ?1 OR project IS NULL
537                 ORDER BY confidence DESC"
538            );
539            (q, vec![Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>])
540        }
541        None => {
542            let q = format!(
543                "SELECT {PATTERN_COLUMNS}
544                 FROM patterns ORDER BY confidence DESC"
545            );
546            (q, vec![])
547        }
548    };
549
550    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
551    let mut stmt = conn.prepare(&query)?;
552    let patterns = stmt
553        .query_map(params_refs.as_slice(), |row| Ok(read_pattern_row(row)))?
554        .filter_map(|r| r.ok())
555        .collect();
556
557    Ok(patterns)
558}
559
560fn read_pattern_row(row: &rusqlite::Row<'_>) -> Pattern {
561    let source_sessions_str: String = row.get(9).unwrap_or_default();
562    let related_files_str: String = row.get(10).unwrap_or_default();
563    let first_seen_str: String = row.get(5).unwrap_or_default();
564    let last_seen_str: String = row.get(6).unwrap_or_default();
565    let last_projected_str: Option<String> = row.get(7).unwrap_or(None);
566    let gen_failed: i32 = row.get(14).unwrap_or(0);
567
568    Pattern {
569        id: row.get(0).unwrap_or_default(),
570        pattern_type: PatternType::from_str(&row.get::<_, String>(1).unwrap_or_default()),
571        description: row.get(2).unwrap_or_default(),
572        confidence: row.get(3).unwrap_or(0.0),
573        times_seen: row.get(4).unwrap_or(1),
574        first_seen: DateTime::parse_from_rfc3339(&first_seen_str)
575            .map(|d| d.with_timezone(&Utc))
576            .unwrap_or_else(|_| Utc::now()),
577        last_seen: DateTime::parse_from_rfc3339(&last_seen_str)
578            .map(|d| d.with_timezone(&Utc))
579            .unwrap_or_else(|_| Utc::now()),
580        last_projected: last_projected_str
581            .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
582            .map(|d| d.with_timezone(&Utc)),
583        status: PatternStatus::from_str(&row.get::<_, String>(8).unwrap_or_default()),
584        source_sessions: serde_json::from_str(&source_sessions_str).unwrap_or_default(),
585        related_files: serde_json::from_str(&related_files_str).unwrap_or_default(),
586        suggested_content: row.get(11).unwrap_or_default(),
587        suggested_target: SuggestedTarget::from_str(&row.get::<_, String>(12).unwrap_or_default()),
588        project: row.get(13).unwrap_or(None),
589        generation_failed: gen_failed != 0,
590    }
591}
592
593// ── Analyzed session tracking ──
594
595/// Record a session as analyzed.
596pub fn record_analyzed_session(
597    conn: &Connection,
598    session_id: &str,
599    project: &str,
600) -> Result<(), CoreError> {
601    conn.execute(
602        "INSERT OR REPLACE INTO analyzed_sessions (session_id, project, analyzed_at)
603         VALUES (?1, ?2, ?3)",
604        params![session_id, project, Utc::now().to_rfc3339()],
605    )?;
606    Ok(())
607}
608
609/// Check if a session has been analyzed.
610pub fn is_session_analyzed(conn: &Connection, session_id: &str) -> Result<bool, CoreError> {
611    let count: u64 = conn.query_row(
612        "SELECT COUNT(*) FROM analyzed_sessions WHERE session_id = ?1",
613        params![session_id],
614        |row| row.get(0),
615    )?;
616    Ok(count > 0)
617}
618
619/// Get ingested sessions for analysis within the time window.
620/// When `rolling_window` is true, returns ALL sessions in the window (re-analyzes everything).
621/// When false, only returns sessions not yet in `analyzed_sessions` (analyze-once).
622pub fn get_sessions_for_analysis(
623    conn: &Connection,
624    project: Option<&str>,
625    since: &DateTime<Utc>,
626    rolling_window: bool,
627) -> Result<Vec<IngestedSession>, CoreError> {
628    let since_str = since.to_rfc3339();
629
630    let (query, params_vec): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match (project, rolling_window) {
631        (Some(proj), true) => {
632            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
633                     FROM ingested_sessions i
634                     WHERE i.project = ?1 AND i.ingested_at >= ?2
635                     ORDER BY i.ingested_at".to_string();
636            (q, vec![
637                Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>,
638                Box::new(since_str) as Box<dyn rusqlite::types::ToSql>,
639            ])
640        }
641        (Some(proj), false) => {
642            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
643                     FROM ingested_sessions i
644                     LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
645                     WHERE a.session_id IS NULL AND i.project = ?1 AND i.ingested_at >= ?2
646                     ORDER BY i.ingested_at".to_string();
647            (q, vec![
648                Box::new(proj.to_string()) as Box<dyn rusqlite::types::ToSql>,
649                Box::new(since_str) as Box<dyn rusqlite::types::ToSql>,
650            ])
651        }
652        (None, true) => {
653            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
654                     FROM ingested_sessions i
655                     WHERE i.ingested_at >= ?1
656                     ORDER BY i.ingested_at".to_string();
657            (q, vec![Box::new(since_str) as Box<dyn rusqlite::types::ToSql>])
658        }
659        (None, false) => {
660            let q = "SELECT i.session_id, i.project, i.session_path, i.file_size, i.file_mtime, i.ingested_at
661                     FROM ingested_sessions i
662                     LEFT JOIN analyzed_sessions a ON i.session_id = a.session_id
663                     WHERE a.session_id IS NULL AND i.ingested_at >= ?1
664                     ORDER BY i.ingested_at".to_string();
665            (q, vec![Box::new(since_str) as Box<dyn rusqlite::types::ToSql>])
666        }
667    };
668
669    let params_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter().map(|p| p.as_ref()).collect();
670    let mut stmt = conn.prepare(&query)?;
671    let sessions = stmt
672        .query_map(params_refs.as_slice(), |row| {
673            let ingested_at_str: String = row.get(5)?;
674            let ingested_at = DateTime::parse_from_rfc3339(&ingested_at_str)
675                .map(|d| d.with_timezone(&Utc))
676                .unwrap_or_else(|_| Utc::now());
677            Ok(IngestedSession {
678                session_id: row.get(0)?,
679                project: row.get(1)?,
680                session_path: row.get(2)?,
681                file_size: row.get(3)?,
682                file_mtime: row.get(4)?,
683                ingested_at,
684            })
685        })?
686        .filter_map(|r| r.ok())
687        .collect();
688
689    Ok(sessions)
690}
691
692// ── Projection operations ──
693
694/// Insert a new projection record.
695pub fn insert_projection(conn: &Connection, proj: &Projection) -> Result<(), CoreError> {
696    conn.execute(
697        "INSERT INTO projections (id, pattern_id, target_type, target_path, content, applied_at, pr_url, status)
698         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
699        params![
700            proj.id,
701            proj.pattern_id,
702            proj.target_type,
703            proj.target_path,
704            proj.content,
705            proj.applied_at.to_rfc3339(),
706            proj.pr_url,
707            proj.status.to_string(),
708        ],
709    )?;
710    Ok(())
711}
712
713/// Check if a pattern already has an active projection.
714pub fn has_projection_for_pattern(conn: &Connection, pattern_id: &str) -> Result<bool, CoreError> {
715    let count: u64 = conn.query_row(
716        "SELECT COUNT(*) FROM projections WHERE pattern_id = ?1",
717        params![pattern_id],
718        |row| row.get(0),
719    )?;
720    Ok(count > 0)
721}
722
723/// Get the set of all pattern IDs that already have projections.
724pub fn get_projected_pattern_ids(
725    conn: &Connection,
726) -> Result<std::collections::HashSet<String>, CoreError> {
727    let mut stmt = conn.prepare("SELECT DISTINCT pattern_id FROM projections")?;
728    let ids = stmt
729        .query_map([], |row| row.get(0))?
730        .filter_map(|r| r.ok())
731        .collect();
732    Ok(ids)
733}
734
735/// Update a pattern's status.
736pub fn update_pattern_status(
737    conn: &Connection,
738    id: &str,
739    status: &PatternStatus,
740) -> Result<(), CoreError> {
741    conn.execute(
742        "UPDATE patterns SET status = ?2 WHERE id = ?1",
743        params![id, status.to_string()],
744    )?;
745    Ok(())
746}
747
748/// Set or clear the generation_failed flag on a pattern.
749pub fn set_generation_failed(
750    conn: &Connection,
751    id: &str,
752    failed: bool,
753) -> Result<(), CoreError> {
754    conn.execute(
755        "UPDATE patterns SET generation_failed = ?2 WHERE id = ?1",
756        params![id, failed as i32],
757    )?;
758    Ok(())
759}
760
761/// Get all projections for active patterns (for staleness detection).
762pub fn get_projections_for_active_patterns(
763    conn: &Connection,
764) -> Result<Vec<Projection>, CoreError> {
765    let mut stmt = conn.prepare(
766        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
767         FROM projections p
768         INNER JOIN patterns pat ON p.pattern_id = pat.id
769         WHERE pat.status = 'active'",
770    )?;
771
772    let projections = stmt
773        .query_map([], |row| {
774            let applied_at_str: String = row.get(5)?;
775            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
776                .map(|d| d.with_timezone(&Utc))
777                .unwrap_or_else(|_| Utc::now());
778            let status_str: String = row.get(7)?;
779            let status = ProjectionStatus::from_str(&status_str)
780                .unwrap_or(ProjectionStatus::Applied);
781            Ok(Projection {
782                id: row.get(0)?,
783                pattern_id: row.get(1)?,
784                target_type: row.get(2)?,
785                target_path: row.get(3)?,
786                content: row.get(4)?,
787                applied_at,
788                pr_url: row.get(6)?,
789                status,
790            })
791        })?
792        .filter_map(|r| r.ok())
793        .collect();
794
795    Ok(projections)
796}
797
798/// Update a pattern's last_projected timestamp to now.
799pub fn update_pattern_last_projected(conn: &Connection, id: &str) -> Result<(), CoreError> {
800    conn.execute(
801        "UPDATE patterns SET last_projected = ?2 WHERE id = ?1",
802        params![id, Utc::now().to_rfc3339()],
803    )?;
804    Ok(())
805}
806
807/// Get all projections with pending_review status.
808pub fn get_pending_review_projections(conn: &Connection) -> Result<Vec<Projection>, CoreError> {
809    let mut stmt = conn.prepare(
810        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
811         FROM projections p
812         WHERE p.status = 'pending_review'
813         ORDER BY p.applied_at ASC",
814    )?;
815
816    let projections = stmt
817        .query_map([], |row| {
818            let applied_at_str: String = row.get(5)?;
819            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
820                .map(|d| d.with_timezone(&Utc))
821                .unwrap_or_else(|_| Utc::now());
822            let status_str: String = row.get(7)?;
823            let status = ProjectionStatus::from_str(&status_str)
824                .unwrap_or(ProjectionStatus::PendingReview);
825            Ok(Projection {
826                id: row.get(0)?,
827                pattern_id: row.get(1)?,
828                target_type: row.get(2)?,
829                target_path: row.get(3)?,
830                content: row.get(4)?,
831                applied_at,
832                pr_url: row.get(6)?,
833                status,
834            })
835        })?
836        .filter_map(|r| r.ok())
837        .collect();
838
839    Ok(projections)
840}
841
842/// Update a projection's status.
843pub fn update_projection_status(
844    conn: &Connection,
845    projection_id: &str,
846    status: &ProjectionStatus,
847) -> Result<(), CoreError> {
848    conn.execute(
849        "UPDATE projections SET status = ?2 WHERE id = ?1",
850        params![projection_id, status.to_string()],
851    )?;
852    Ok(())
853}
854
855/// Delete a projection record.
856pub fn delete_projection(conn: &Connection, projection_id: &str) -> Result<(), CoreError> {
857    conn.execute("DELETE FROM projections WHERE id = ?1", params![projection_id])?;
858    Ok(())
859}
860
861/// Get applied projections that have a PR URL (for sync).
862pub fn get_applied_projections_with_pr(conn: &Connection) -> Result<Vec<Projection>, CoreError> {
863    let mut stmt = conn.prepare(
864        "SELECT p.id, p.pattern_id, p.target_type, p.target_path, p.content, p.applied_at, p.pr_url, p.status
865         FROM projections p
866         WHERE p.status = 'applied' AND p.pr_url IS NOT NULL",
867    )?;
868
869    let projections = stmt
870        .query_map([], |row| {
871            let applied_at_str: String = row.get(5)?;
872            let applied_at = DateTime::parse_from_rfc3339(&applied_at_str)
873                .map(|d| d.with_timezone(&Utc))
874                .unwrap_or_else(|_| Utc::now());
875            let status_str: String = row.get(7)?;
876            let status = ProjectionStatus::from_str(&status_str)
877                .unwrap_or(ProjectionStatus::Applied);
878            Ok(Projection {
879                id: row.get(0)?,
880                pattern_id: row.get(1)?,
881                target_type: row.get(2)?,
882                target_path: row.get(3)?,
883                content: row.get(4)?,
884                applied_at,
885                pr_url: row.get(6)?,
886                status,
887            })
888        })?
889        .filter_map(|r| r.ok())
890        .collect();
891
892    Ok(projections)
893}
894
895/// Get pattern IDs that have projections with specific statuses.
896pub fn get_projected_pattern_ids_by_status(
897    conn: &Connection,
898    statuses: &[ProjectionStatus],
899) -> Result<std::collections::HashSet<String>, CoreError> {
900    if statuses.is_empty() {
901        return Ok(std::collections::HashSet::new());
902    }
903    let placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 1)).collect();
904    let sql = format!(
905        "SELECT DISTINCT pattern_id FROM projections WHERE status IN ({})",
906        placeholders.join(", ")
907    );
908    let mut stmt = conn.prepare(&sql)?;
909    let params: Vec<String> = statuses.iter().map(|s| s.to_string()).collect();
910    let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|s| s as &dyn rusqlite::types::ToSql).collect();
911    let ids = stmt
912        .query_map(param_refs.as_slice(), |row| row.get(0))?
913        .filter_map(|r| r.ok())
914        .collect();
915    Ok(ids)
916}
917
918/// Update a projection's PR URL.
919pub fn update_projection_pr_url(
920    conn: &Connection,
921    projection_id: &str,
922    pr_url: &str,
923) -> Result<(), CoreError> {
924    conn.execute(
925        "UPDATE projections SET pr_url = ?2 WHERE id = ?1",
926        params![projection_id, pr_url],
927    )?;
928    Ok(())
929}
930
931// ── Knowledge graph: Migration ──
932
933/// Migrate v1 patterns to v2 knowledge nodes. Returns number of nodes created.
934/// Safe to call multiple times — skips patterns that already have corresponding nodes.
935pub fn migrate_patterns_to_nodes(conn: &Connection) -> Result<usize, CoreError> {
936    let mut stmt = conn.prepare(
937        "SELECT id, pattern_type, description, confidence, status, suggested_content, suggested_target, project, first_seen, last_seen
938         FROM patterns",
939    )?;
940    let patterns: Vec<(String, String, String, f64, String, String, String, Option<String>, String, String)> = stmt.query_map([], |row| {
941        Ok((
942            row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?,
943            row.get(4)?, row.get(5)?, row.get(6)?, row.get(7)?,
944            row.get(8)?, row.get(9)?,
945        ))
946    })?.filter_map(|r| r.ok()).collect();
947
948    let mut count = 0;
949    for (id, pattern_type, description, confidence, _status, suggested_content, suggested_target, project, first_seen, last_seen) in &patterns {
950        let node_id = format!("migrated-{id}");
951
952        // Skip if already migrated
953        if get_node(conn, &node_id)?.is_some() {
954            continue;
955        }
956
957        // Determine node type using the spec's deterministic mapping
958        let content_lower = suggested_content.to_lowercase();
959        let has_directive_keyword = content_lower.contains("always") || content_lower.contains("never");
960        let node_type = if *confidence >= 0.85 && has_directive_keyword {
961            NodeType::Directive
962        } else {
963            match (pattern_type.as_str(), suggested_target.as_str()) {
964                ("repetitive_instruction", "claude_md") => NodeType::Rule,
965                ("repetitive_instruction", "skill") => NodeType::Directive,
966                ("recurring_mistake", _) => NodeType::Pattern,
967                ("workflow_pattern", "skill") => NodeType::Skill,
968                ("workflow_pattern", "claude_md") => NodeType::Rule,
969                ("stale_context", _) => NodeType::Memory,
970                ("redundant_context", _) => NodeType::Memory,
971                _ => NodeType::Pattern,
972            }
973        };
974
975        let created_at = DateTime::parse_from_rfc3339(first_seen)
976            .unwrap_or_default()
977            .with_timezone(&Utc);
978        let updated_at = DateTime::parse_from_rfc3339(last_seen)
979            .unwrap_or_default()
980            .with_timezone(&Utc);
981
982        let node = KnowledgeNode {
983            id: node_id,
984            node_type,
985            scope: NodeScope::Project,
986            project_id: project.clone(),
987            content: description.clone(),
988            confidence: *confidence,
989            status: NodeStatus::Active,
990            created_at,
991            updated_at,
992            projected_at: None,
993            pr_url: None,
994        };
995        insert_node(conn, &node)?;
996        count += 1;
997    }
998    Ok(count)
999}
1000
1001// ── Knowledge graph: Node operations ──
1002
1003pub fn insert_node(conn: &Connection, node: &KnowledgeNode) -> Result<(), CoreError> {
1004    conn.execute(
1005        "INSERT INTO nodes (id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url)
1006         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
1007        params![
1008            node.id,
1009            node.node_type.to_string(),
1010            node.scope.to_string(),
1011            node.project_id,
1012            node.content,
1013            node.confidence,
1014            node.status.to_string(),
1015            node.created_at.to_rfc3339(),
1016            node.updated_at.to_rfc3339(),
1017            node.projected_at,
1018            node.pr_url,
1019        ],
1020    )?;
1021    Ok(())
1022}
1023
1024pub fn get_node(conn: &Connection, id: &str) -> Result<Option<KnowledgeNode>, CoreError> {
1025    let mut stmt = conn.prepare(
1026        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1027         FROM nodes WHERE id = ?1",
1028    )?;
1029    let result = stmt.query_row(params![id], |row| {
1030        Ok(KnowledgeNode {
1031            id: row.get(0)?,
1032            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1033            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1034            project_id: row.get(3)?,
1035            content: row.get(4)?,
1036            confidence: row.get(5)?,
1037            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1038            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1039                .unwrap_or_default()
1040                .with_timezone(&Utc),
1041            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1042                .unwrap_or_default()
1043                .with_timezone(&Utc),
1044            projected_at: row.get(9)?,
1045            pr_url: row.get(10)?,
1046        })
1047    }).optional()?;
1048    Ok(result)
1049}
1050
1051pub fn get_nodes_by_scope(
1052    conn: &Connection,
1053    scope: &NodeScope,
1054    project_id: Option<&str>,
1055    statuses: &[NodeStatus],
1056) -> Result<Vec<KnowledgeNode>, CoreError> {
1057    if statuses.is_empty() {
1058        return Ok(Vec::new());
1059    }
1060    let status_placeholders: Vec<String> = statuses.iter().enumerate().map(|(i, _)| format!("?{}", i + 3)).collect();
1061    let sql = format!(
1062        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1063         FROM nodes WHERE scope = ?1 AND (?2 IS NULL OR project_id = ?2) AND status IN ({})
1064         ORDER BY confidence DESC",
1065        status_placeholders.join(", ")
1066    );
1067    let mut stmt = conn.prepare(&sql)?;
1068    let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = vec![
1069        Box::new(scope.to_string()),
1070        Box::new(project_id.map(|s| s.to_string())),
1071    ];
1072    for s in statuses {
1073        params_vec.push(Box::new(s.to_string()));
1074    }
1075    let rows = stmt.query_map(rusqlite::params_from_iter(params_vec.iter().map(|p| p.as_ref())), |row| {
1076        Ok(KnowledgeNode {
1077            id: row.get(0)?,
1078            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1079            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1080            project_id: row.get(3)?,
1081            content: row.get(4)?,
1082            confidence: row.get(5)?,
1083            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1084            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1085                .unwrap_or_default()
1086                .with_timezone(&Utc),
1087            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1088                .unwrap_or_default()
1089                .with_timezone(&Utc),
1090            projected_at: row.get(9)?,
1091            pr_url: row.get(10)?,
1092        })
1093    })?;
1094    let mut nodes = Vec::new();
1095    for row in rows {
1096        nodes.push(row?);
1097    }
1098    Ok(nodes)
1099}
1100
1101/// Get all nodes with a given status, ordered by confidence DESC.
1102pub fn get_nodes_by_status(
1103    conn: &Connection,
1104    status: &NodeStatus,
1105) -> Result<Vec<KnowledgeNode>, CoreError> {
1106    let mut stmt = conn.prepare(
1107        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1108         FROM nodes WHERE status = ?1
1109         ORDER BY confidence DESC",
1110    )?;
1111    let rows = stmt.query_map(params![status.to_string()], |row| {
1112        Ok(KnowledgeNode {
1113            id: row.get(0)?,
1114            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1115            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1116            project_id: row.get(3)?,
1117            content: row.get(4)?,
1118            confidence: row.get(5)?,
1119            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1120            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1121                .unwrap_or_default()
1122                .with_timezone(&Utc),
1123            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1124                .unwrap_or_default()
1125                .with_timezone(&Utc),
1126            projected_at: row.get(9)?,
1127            pr_url: row.get(10)?,
1128        })
1129    })?;
1130    let mut nodes = Vec::new();
1131    for row in rows {
1132        nodes.push(row?);
1133    }
1134    Ok(nodes)
1135}
1136
1137pub fn update_node_confidence(conn: &Connection, id: &str, confidence: f64) -> Result<(), CoreError> {
1138    conn.execute(
1139        "UPDATE nodes SET confidence = ?1, updated_at = ?2 WHERE id = ?3",
1140        params![confidence, Utc::now().to_rfc3339(), id],
1141    )?;
1142    Ok(())
1143}
1144
1145pub fn update_node_status(conn: &Connection, id: &str, status: &NodeStatus) -> Result<(), CoreError> {
1146    conn.execute(
1147        "UPDATE nodes SET status = ?1, updated_at = ?2 WHERE id = ?3",
1148        params![status.to_string(), Utc::now().to_rfc3339(), id],
1149    )?;
1150    Ok(())
1151}
1152
1153pub fn update_node_content(conn: &Connection, id: &str, content: &str) -> Result<(), CoreError> {
1154    conn.execute(
1155        "UPDATE nodes SET content = ?1, updated_at = ?2 WHERE id = ?3",
1156        params![content, Utc::now().to_rfc3339(), id],
1157    )?;
1158    Ok(())
1159}
1160
1161// ── Knowledge graph: Edge operations ──
1162
1163pub fn insert_edge(conn: &Connection, edge: &KnowledgeEdge) -> Result<(), CoreError> {
1164    conn.execute(
1165        "INSERT OR IGNORE INTO edges (source_id, target_id, type, created_at)
1166         VALUES (?1, ?2, ?3, ?4)",
1167        params![
1168            edge.source_id,
1169            edge.target_id,
1170            edge.edge_type.to_string(),
1171            edge.created_at.to_rfc3339(),
1172        ],
1173    )?;
1174    Ok(())
1175}
1176
1177pub fn get_edges_from(conn: &Connection, source_id: &str) -> Result<Vec<KnowledgeEdge>, CoreError> {
1178    let mut stmt = conn.prepare(
1179        "SELECT source_id, target_id, type, created_at FROM edges WHERE source_id = ?1",
1180    )?;
1181    let rows = stmt.query_map(params![source_id], |row| {
1182        Ok(KnowledgeEdge {
1183            source_id: row.get(0)?,
1184            target_id: row.get(1)?,
1185            edge_type: EdgeType::from_str(&row.get::<_, String>(2)?).unwrap_or(EdgeType::Supports),
1186            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
1187                .unwrap_or_default()
1188                .with_timezone(&Utc),
1189        })
1190    })?;
1191    let mut edges = Vec::new();
1192    for row in rows {
1193        edges.push(row?);
1194    }
1195    Ok(edges)
1196}
1197
1198pub fn get_edges_to(conn: &Connection, target_id: &str) -> Result<Vec<KnowledgeEdge>, CoreError> {
1199    let mut stmt = conn.prepare(
1200        "SELECT source_id, target_id, type, created_at FROM edges WHERE target_id = ?1",
1201    )?;
1202    let rows = stmt.query_map(params![target_id], |row| {
1203        Ok(KnowledgeEdge {
1204            source_id: row.get(0)?,
1205            target_id: row.get(1)?,
1206            edge_type: EdgeType::from_str(&row.get::<_, String>(2)?).unwrap_or(EdgeType::Supports),
1207            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(3)?)
1208                .unwrap_or_default()
1209                .with_timezone(&Utc),
1210        })
1211    })?;
1212    let mut edges = Vec::new();
1213    for row in rows {
1214        edges.push(row?);
1215    }
1216    Ok(edges)
1217}
1218
1219pub fn delete_edge(conn: &Connection, source_id: &str, target_id: &str, edge_type: &EdgeType) -> Result<(), CoreError> {
1220    conn.execute(
1221        "DELETE FROM edges WHERE source_id = ?1 AND target_id = ?2 AND type = ?3",
1222        params![source_id, target_id, edge_type.to_string()],
1223    )?;
1224    Ok(())
1225}
1226
1227/// Mark new_id as superseding old_id: archives old node and creates supersedes edge.
1228pub fn supersede_node(conn: &Connection, new_id: &str, old_id: &str) -> Result<(), CoreError> {
1229    update_node_status(conn, old_id, &NodeStatus::Archived)?;
1230    let edge = KnowledgeEdge {
1231        source_id: new_id.to_string(),
1232        target_id: old_id.to_string(),
1233        edge_type: EdgeType::Supersedes,
1234        created_at: Utc::now(),
1235    };
1236    insert_edge(conn, &edge)?;
1237    Ok(())
1238}
1239
1240/// Result of applying a batch of graph operations.
1241#[derive(Debug, Clone, Default)]
1242pub struct ApplyGraphResult {
1243    pub nodes_created: usize,
1244    pub nodes_updated: usize,
1245    pub edges_created: usize,
1246    pub nodes_merged: usize,
1247}
1248
1249/// Apply a batch of graph operations to the database.
1250pub fn apply_graph_operations(conn: &Connection, ops: &[GraphOperation]) -> Result<ApplyGraphResult, CoreError> {
1251    let mut result = ApplyGraphResult::default();
1252
1253    for op in ops {
1254        match op {
1255            GraphOperation::CreateNode { node_type, scope, project_id, content, confidence } => {
1256                let node = KnowledgeNode {
1257                    id: uuid::Uuid::new_v4().to_string(),
1258                    node_type: node_type.clone(),
1259                    scope: scope.clone(),
1260                    project_id: project_id.clone(),
1261                    content: content.clone(),
1262                    confidence: *confidence,
1263                    status: NodeStatus::Active,
1264                    created_at: Utc::now(),
1265                    updated_at: Utc::now(),
1266                    projected_at: None,
1267                    pr_url: None,
1268                };
1269                insert_node(conn, &node)?;
1270                result.nodes_created += 1;
1271            }
1272            GraphOperation::UpdateNode { id, confidence, content } => {
1273                if let Some(conf) = confidence {
1274                    update_node_confidence(conn, id, *conf)?;
1275                }
1276                if let Some(cont) = content {
1277                    update_node_content(conn, id, cont)?;
1278                }
1279                result.nodes_updated += 1;
1280            }
1281            GraphOperation::CreateEdge { source_id, target_id, edge_type } => {
1282                let edge = KnowledgeEdge {
1283                    source_id: source_id.clone(),
1284                    target_id: target_id.clone(),
1285                    edge_type: edge_type.clone(),
1286                    created_at: Utc::now(),
1287                };
1288                insert_edge(conn, &edge)?;
1289                result.edges_created += 1;
1290            }
1291            GraphOperation::MergeNodes { keep_id, remove_id } => {
1292                supersede_node(conn, keep_id, remove_id)?;
1293                result.nodes_merged += 1;
1294            }
1295        }
1296    }
1297
1298    Ok(result)
1299}
1300
1301// ── Knowledge graph: Project operations ──
1302
1303/// Generate a human-readable slug from a repository path.
1304pub fn generate_project_slug(repo_path: &str) -> String {
1305    let name = std::path::Path::new(repo_path)
1306        .file_name()
1307        .and_then(|n| n.to_str())
1308        .unwrap_or("unnamed-project");
1309
1310    let slug: String = name
1311        .to_lowercase()
1312        .chars()
1313        .map(|c| if c.is_alphanumeric() { c } else { '-' })
1314        .collect();
1315    let slug = slug.trim_matches('-').to_string();
1316    if slug.is_empty() {
1317        "unnamed-project".to_string()
1318    } else {
1319        // Collapse consecutive hyphens
1320        let mut result = String::new();
1321        let mut prev_hyphen = false;
1322        for c in slug.chars() {
1323            if c == '-' {
1324                if !prev_hyphen {
1325                    result.push(c);
1326                }
1327                prev_hyphen = true;
1328            } else {
1329                result.push(c);
1330                prev_hyphen = false;
1331            }
1332        }
1333        result
1334    }
1335}
1336
1337/// Generate a unique project slug, appending -2, -3, etc. if needed.
1338pub fn generate_unique_project_slug(conn: &Connection, repo_path: &str) -> Result<String, CoreError> {
1339    let base = generate_project_slug(repo_path);
1340    if get_project(conn, &base)?.is_none() {
1341        return Ok(base);
1342    }
1343    for i in 2..100 {
1344        let candidate = format!("{base}-{i}");
1345        if get_project(conn, &candidate)?.is_none() {
1346            return Ok(candidate);
1347        }
1348    }
1349    Ok(format!("{base}-{}", &uuid::Uuid::new_v4().to_string()[..8]))
1350}
1351
1352pub fn upsert_project(conn: &Connection, project: &KnowledgeProject) -> Result<(), CoreError> {
1353    conn.execute(
1354        "INSERT INTO projects (id, path, remote_url, agent_type, last_seen)
1355         VALUES (?1, ?2, ?3, ?4, ?5)
1356         ON CONFLICT(id) DO UPDATE SET
1357             path = excluded.path,
1358             remote_url = COALESCE(excluded.remote_url, projects.remote_url),
1359             last_seen = excluded.last_seen",
1360        params![
1361            project.id,
1362            project.path,
1363            project.remote_url,
1364            project.agent_type,
1365            project.last_seen.to_rfc3339(),
1366        ],
1367    )?;
1368    Ok(())
1369}
1370
1371pub fn get_project(conn: &Connection, id: &str) -> Result<Option<KnowledgeProject>, CoreError> {
1372    let mut stmt = conn.prepare(
1373        "SELECT id, path, remote_url, agent_type, last_seen FROM projects WHERE id = ?1",
1374    )?;
1375    let result = stmt.query_row(params![id], |row| {
1376        Ok(KnowledgeProject {
1377            id: row.get(0)?,
1378            path: row.get(1)?,
1379            remote_url: row.get(2)?,
1380            agent_type: row.get(3)?,
1381            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1382                .unwrap_or_default()
1383                .with_timezone(&Utc),
1384        })
1385    }).optional()?;
1386    Ok(result)
1387}
1388
1389pub fn get_project_by_remote_url(conn: &Connection, remote_url: &str) -> Result<Option<KnowledgeProject>, CoreError> {
1390    let mut stmt = conn.prepare(
1391        "SELECT id, path, remote_url, agent_type, last_seen FROM projects WHERE remote_url = ?1",
1392    )?;
1393    let result = stmt.query_row(params![remote_url], |row| {
1394        Ok(KnowledgeProject {
1395            id: row.get(0)?,
1396            path: row.get(1)?,
1397            remote_url: row.get(2)?,
1398            agent_type: row.get(3)?,
1399            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1400                .unwrap_or_default()
1401                .with_timezone(&Utc),
1402        })
1403    }).optional()?;
1404    Ok(result)
1405}
1406
1407/// Get active nodes that haven't been projected yet, ordered by confidence DESC.
1408pub fn get_unprojected_nodes(conn: &Connection) -> Result<Vec<KnowledgeNode>, CoreError> {
1409    let mut stmt = conn.prepare(
1410        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1411         FROM nodes WHERE status = 'active' AND projected_at IS NULL
1412         ORDER BY confidence DESC",
1413    )?;
1414    let rows = stmt.query_map([], |row| {
1415        Ok(KnowledgeNode {
1416            id: row.get(0)?,
1417            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1418            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1419            project_id: row.get(3)?,
1420            content: row.get(4)?,
1421            confidence: row.get(5)?,
1422            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1423            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1424                .unwrap_or_default()
1425                .with_timezone(&Utc),
1426            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1427                .unwrap_or_default()
1428                .with_timezone(&Utc),
1429            projected_at: row.get(9)?,
1430            pr_url: row.get(10)?,
1431        })
1432    })?;
1433    let mut nodes = Vec::new();
1434    for row in rows {
1435        nodes.push(row?);
1436    }
1437    Ok(nodes)
1438}
1439
1440/// Mark a node as projected (direct write, no PR).
1441pub fn mark_node_projected(conn: &Connection, id: &str) -> Result<(), CoreError> {
1442    conn.execute(
1443        "UPDATE nodes SET projected_at = ?1 WHERE id = ?2",
1444        params![Utc::now().to_rfc3339(), id],
1445    )?;
1446    Ok(())
1447}
1448
1449/// Mark a node as projected via PR.
1450pub fn mark_node_projected_with_pr(conn: &Connection, id: &str, pr_url: &str) -> Result<(), CoreError> {
1451    conn.execute(
1452        "UPDATE nodes SET projected_at = ?1, pr_url = ?2 WHERE id = ?3",
1453        params![Utc::now().to_rfc3339(), pr_url, id],
1454    )?;
1455    Ok(())
1456}
1457
1458/// Get all nodes with an associated PR URL.
1459pub fn get_nodes_with_pr(conn: &Connection) -> Result<Vec<KnowledgeNode>, CoreError> {
1460    let mut stmt = conn.prepare(
1461        "SELECT id, type, scope, project_id, content, confidence, status, created_at, updated_at, projected_at, pr_url
1462         FROM nodes WHERE pr_url IS NOT NULL",
1463    )?;
1464    let rows = stmt.query_map([], |row| {
1465        Ok(KnowledgeNode {
1466            id: row.get(0)?,
1467            node_type: NodeType::from_str(&row.get::<_, String>(1)?),
1468            scope: NodeScope::from_str(&row.get::<_, String>(2)?),
1469            project_id: row.get(3)?,
1470            content: row.get(4)?,
1471            confidence: row.get(5)?,
1472            status: NodeStatus::from_str(&row.get::<_, String>(6)?),
1473            created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(7)?)
1474                .unwrap_or_default()
1475                .with_timezone(&Utc),
1476            updated_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
1477                .unwrap_or_default()
1478                .with_timezone(&Utc),
1479            projected_at: row.get(9)?,
1480            pr_url: row.get(10)?,
1481        })
1482    })?;
1483    let mut nodes = Vec::new();
1484    for row in rows {
1485        nodes.push(row?);
1486    }
1487    Ok(nodes)
1488}
1489
1490/// Clear PR URL from nodes after merge.
1491pub fn clear_node_pr(conn: &Connection, pr_url: &str) -> Result<(), CoreError> {
1492    conn.execute(
1493        "UPDATE nodes SET pr_url = NULL WHERE pr_url = ?1",
1494        params![pr_url],
1495    )?;
1496    Ok(())
1497}
1498
1499/// Dismiss all nodes for a closed PR.
1500pub fn dismiss_nodes_for_pr(conn: &Connection, pr_url: &str) -> Result<(), CoreError> {
1501    conn.execute(
1502        "UPDATE nodes SET status = 'dismissed', pr_url = NULL WHERE pr_url = ?1",
1503        params![pr_url],
1504    )?;
1505    Ok(())
1506}
1507
1508pub fn get_all_projects(conn: &Connection) -> Result<Vec<KnowledgeProject>, CoreError> {
1509    let mut stmt = conn.prepare(
1510        "SELECT id, path, remote_url, agent_type, last_seen FROM projects ORDER BY last_seen DESC",
1511    )?;
1512    let rows = stmt.query_map([], |row| {
1513        Ok(KnowledgeProject {
1514            id: row.get(0)?,
1515            path: row.get(1)?,
1516            remote_url: row.get(2)?,
1517            agent_type: row.get(3)?,
1518            last_seen: DateTime::parse_from_rfc3339(&row.get::<_, String>(4)?)
1519                .unwrap_or_default()
1520                .with_timezone(&Utc),
1521        })
1522    })?;
1523    let mut projects = Vec::new();
1524    for row in rows {
1525        projects.push(row?);
1526    }
1527    Ok(projects)
1528}
1529
1530#[cfg(test)]
1531mod tests {
1532    use super::*;
1533    use crate::models::*;
1534
1535    fn test_db() -> Connection {
1536        let conn = Connection::open_in_memory().unwrap();
1537        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
1538        migrate(&conn).unwrap();
1539        conn
1540    }
1541
1542    fn test_pattern(id: &str, description: &str) -> Pattern {
1543        Pattern {
1544            id: id.to_string(),
1545            pattern_type: PatternType::RepetitiveInstruction,
1546            description: description.to_string(),
1547            confidence: 0.85,
1548            times_seen: 1,
1549            first_seen: Utc::now(),
1550            last_seen: Utc::now(),
1551            last_projected: None,
1552            status: PatternStatus::Discovered,
1553            source_sessions: vec!["sess-1".to_string()],
1554            related_files: vec![],
1555            suggested_content: "Always do X".to_string(),
1556            suggested_target: SuggestedTarget::ClaudeMd,
1557            project: Some("/test/project".to_string()),
1558            generation_failed: false,
1559        }
1560    }
1561
1562    #[test]
1563    fn test_insert_and_get_pattern() {
1564        let conn = test_db();
1565        let pattern = test_pattern("pat-1", "Use uv for Python packages");
1566        insert_pattern(&conn, &pattern).unwrap();
1567
1568        let patterns = get_all_patterns(&conn, None).unwrap();
1569        assert_eq!(patterns.len(), 1);
1570        assert_eq!(patterns[0].id, "pat-1");
1571        assert_eq!(patterns[0].description, "Use uv for Python packages");
1572        assert!((patterns[0].confidence - 0.85).abs() < f64::EPSILON);
1573    }
1574
1575    #[test]
1576    fn test_pattern_merge_update() {
1577        let conn = test_db();
1578        let pattern = test_pattern("pat-1", "Use uv for Python packages");
1579        insert_pattern(&conn, &pattern).unwrap();
1580
1581        update_pattern_merge(
1582            &conn,
1583            "pat-1",
1584            &["sess-2".to_string(), "sess-3".to_string()],
1585            0.92,
1586            Utc::now(),
1587            2,
1588        )
1589        .unwrap();
1590
1591        let patterns = get_all_patterns(&conn, None).unwrap();
1592        assert_eq!(patterns[0].times_seen, 3);
1593        assert!((patterns[0].confidence - 0.92).abs() < f64::EPSILON);
1594        assert_eq!(patterns[0].source_sessions.len(), 3);
1595    }
1596
1597    #[test]
1598    fn test_get_patterns_by_status() {
1599        let conn = test_db();
1600        let p1 = test_pattern("pat-1", "Pattern one");
1601        let mut p2 = test_pattern("pat-2", "Pattern two");
1602        p2.status = PatternStatus::Active;
1603        insert_pattern(&conn, &p1).unwrap();
1604        insert_pattern(&conn, &p2).unwrap();
1605
1606        let discovered = get_patterns(&conn, &["discovered"], None).unwrap();
1607        assert_eq!(discovered.len(), 1);
1608        assert_eq!(discovered[0].id, "pat-1");
1609
1610        let active = get_patterns(&conn, &["active"], None).unwrap();
1611        assert_eq!(active.len(), 1);
1612        assert_eq!(active[0].id, "pat-2");
1613
1614        let both = get_patterns(&conn, &["discovered", "active"], None).unwrap();
1615        assert_eq!(both.len(), 2);
1616    }
1617
1618    #[test]
1619    fn test_analyzed_session_tracking() {
1620        let conn = test_db();
1621        assert!(!is_session_analyzed(&conn, "sess-1").unwrap());
1622
1623        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1624        assert!(is_session_analyzed(&conn, "sess-1").unwrap());
1625        assert!(!is_session_analyzed(&conn, "sess-2").unwrap());
1626    }
1627
1628    #[test]
1629    fn test_sessions_for_analysis() {
1630        let conn = test_db();
1631
1632        // Record an ingested session
1633        let session = IngestedSession {
1634            session_id: "sess-1".to_string(),
1635            project: "/test".to_string(),
1636            session_path: "/tmp/test.jsonl".to_string(),
1637            file_size: 100,
1638            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1639            ingested_at: Utc::now(),
1640        };
1641        record_ingested_session(&conn, &session).unwrap();
1642
1643        // It should appear in sessions for analysis (non-rolling)
1644        let since = Utc::now() - chrono::Duration::days(14);
1645        let pending = get_sessions_for_analysis(&conn, None, &since, false).unwrap();
1646        assert_eq!(pending.len(), 1);
1647
1648        // After marking as analyzed, it should not appear in non-rolling mode
1649        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1650        let pending = get_sessions_for_analysis(&conn, None, &since, false).unwrap();
1651        assert_eq!(pending.len(), 0);
1652
1653        // But it SHOULD still appear in rolling window mode
1654        let pending = get_sessions_for_analysis(&conn, None, &since, true).unwrap();
1655        assert_eq!(pending.len(), 1);
1656    }
1657
1658    #[test]
1659    fn test_insert_and_check_projection() {
1660        let conn = test_db();
1661        let pattern = test_pattern("pat-1", "Use uv");
1662        insert_pattern(&conn, &pattern).unwrap();
1663
1664        assert!(!has_projection_for_pattern(&conn, "pat-1").unwrap());
1665
1666        let proj = Projection {
1667            id: "proj-1".to_string(),
1668            pattern_id: "pat-1".to_string(),
1669            target_type: "claude_md".to_string(),
1670            target_path: "/test/CLAUDE.md".to_string(),
1671            content: "Always use uv".to_string(),
1672            applied_at: Utc::now(),
1673            pr_url: None,
1674            status: ProjectionStatus::Applied,
1675        };
1676        insert_projection(&conn, &proj).unwrap();
1677
1678        assert!(has_projection_for_pattern(&conn, "pat-1").unwrap());
1679        assert!(!has_projection_for_pattern(&conn, "pat-2").unwrap());
1680    }
1681
1682    #[test]
1683    fn test_update_pattern_status() {
1684        let conn = test_db();
1685        let pattern = test_pattern("pat-1", "Test pattern");
1686        insert_pattern(&conn, &pattern).unwrap();
1687
1688        update_pattern_status(&conn, "pat-1", &PatternStatus::Active).unwrap();
1689        let patterns = get_patterns(&conn, &["active"], None).unwrap();
1690        assert_eq!(patterns.len(), 1);
1691        assert_eq!(patterns[0].id, "pat-1");
1692    }
1693
1694    #[test]
1695    fn test_set_generation_failed() {
1696        let conn = test_db();
1697        let pattern = test_pattern("pat-1", "Test pattern");
1698        insert_pattern(&conn, &pattern).unwrap();
1699
1700        assert!(!get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1701
1702        set_generation_failed(&conn, "pat-1", true).unwrap();
1703        assert!(get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1704
1705        set_generation_failed(&conn, "pat-1", false).unwrap();
1706        assert!(!get_all_patterns(&conn, None).unwrap()[0].generation_failed);
1707    }
1708
1709    #[test]
1710    fn test_projections_nudged_column_defaults_to_zero() {
1711        let conn = test_db();
1712
1713        // Verify the nudged column exists by preparing a statement that references it
1714        conn.prepare("SELECT nudged FROM projections").unwrap();
1715
1716        // Insert a projection without specifying nudged — should default to 0
1717        let pattern = test_pattern("pat-1", "Test pattern");
1718        insert_pattern(&conn, &pattern).unwrap();
1719
1720        let proj = Projection {
1721            id: "proj-1".to_string(),
1722            pattern_id: "pat-1".to_string(),
1723            target_type: "claude_md".to_string(),
1724            target_path: "/test/CLAUDE.md".to_string(),
1725            content: "Always use uv".to_string(),
1726            applied_at: Utc::now(),
1727            pr_url: None,
1728            status: ProjectionStatus::Applied,
1729        };
1730        insert_projection(&conn, &proj).unwrap();
1731
1732        let nudged: i64 = conn
1733            .query_row(
1734                "SELECT nudged FROM projections WHERE id = 'proj-1'",
1735                [],
1736                |row| row.get(0),
1737            )
1738            .unwrap();
1739        assert_eq!(nudged, 0, "nudged column should default to 0");
1740    }
1741
1742    // ── Tests for auto-apply pipeline DB functions ──
1743
1744    #[test]
1745    fn test_last_applied_at_empty() {
1746        let conn = test_db();
1747        let result = last_applied_at(&conn).unwrap();
1748        assert_eq!(result, None);
1749    }
1750
1751    #[test]
1752    fn test_last_applied_at_returns_max() {
1753        let conn = test_db();
1754
1755        // Insert two patterns to serve as FK targets
1756        let p1 = test_pattern("pat-1", "Pattern one");
1757        let p2 = test_pattern("pat-2", "Pattern two");
1758        insert_pattern(&conn, &p1).unwrap();
1759        insert_pattern(&conn, &p2).unwrap();
1760
1761        // Insert projections with different timestamps
1762        let earlier = chrono::DateTime::parse_from_rfc3339("2026-01-10T00:00:00Z")
1763            .unwrap()
1764            .with_timezone(&Utc);
1765        let later = chrono::DateTime::parse_from_rfc3339("2026-02-15T12:00:00Z")
1766            .unwrap()
1767            .with_timezone(&Utc);
1768
1769        let proj1 = Projection {
1770            id: "proj-1".to_string(),
1771            pattern_id: "pat-1".to_string(),
1772            target_type: "Skill".to_string(),
1773            target_path: "/path/a".to_string(),
1774            content: "content a".to_string(),
1775            applied_at: earlier,
1776            pr_url: None,
1777            status: ProjectionStatus::Applied,
1778        };
1779        let proj2 = Projection {
1780            id: "proj-2".to_string(),
1781            pattern_id: "pat-2".to_string(),
1782            target_type: "Skill".to_string(),
1783            target_path: "/path/b".to_string(),
1784            content: "content b".to_string(),
1785            applied_at: later,
1786            pr_url: None,
1787            status: ProjectionStatus::Applied,
1788        };
1789        insert_projection(&conn, &proj1).unwrap();
1790        insert_projection(&conn, &proj2).unwrap();
1791
1792        let result = last_applied_at(&conn).unwrap();
1793        assert!(result.is_some());
1794        // The max should be the later timestamp
1795        let max_ts = result.unwrap();
1796        assert!(max_ts.contains("2026-02-15"), "Expected later timestamp, got: {}", max_ts);
1797    }
1798
1799    #[test]
1800    fn test_has_unanalyzed_sessions_empty() {
1801        let conn = test_db();
1802        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1803    }
1804
1805    #[test]
1806    fn test_has_unanalyzed_sessions_with_new_session() {
1807        let conn = test_db();
1808
1809        let session = IngestedSession {
1810            session_id: "sess-1".to_string(),
1811            project: "/test".to_string(),
1812            session_path: "/tmp/test.jsonl".to_string(),
1813            file_size: 100,
1814            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1815            ingested_at: Utc::now(),
1816        };
1817        record_ingested_session(&conn, &session).unwrap();
1818
1819        assert!(has_unanalyzed_sessions(&conn).unwrap());
1820    }
1821
1822    #[test]
1823    fn test_has_unanalyzed_sessions_after_analysis() {
1824        let conn = test_db();
1825
1826        let session = IngestedSession {
1827            session_id: "sess-1".to_string(),
1828            project: "/test".to_string(),
1829            session_path: "/tmp/test.jsonl".to_string(),
1830            file_size: 100,
1831            file_mtime: "2026-01-01T00:00:00Z".to_string(),
1832            ingested_at: Utc::now(),
1833        };
1834        record_ingested_session(&conn, &session).unwrap();
1835        record_analyzed_session(&conn, "sess-1", "/test").unwrap();
1836
1837        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1838    }
1839
1840    #[test]
1841    fn test_has_unprojected_patterns_empty() {
1842        let conn = test_db();
1843        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1844    }
1845
1846    #[test]
1847    fn test_has_unprojected_patterns_with_discovered() {
1848        let conn = test_db();
1849
1850        let pattern = test_pattern("pat-1", "Use uv for Python");
1851        insert_pattern(&conn, &pattern).unwrap();
1852
1853        assert!(has_unprojected_patterns(&conn, 0.0).unwrap());
1854    }
1855
1856    #[test]
1857    fn test_has_unprojected_patterns_after_projection() {
1858        let conn = test_db();
1859
1860        let pattern = test_pattern("pat-1", "Use uv for Python");
1861        insert_pattern(&conn, &pattern).unwrap();
1862
1863        let proj = Projection {
1864            id: "proj-1".to_string(),
1865            pattern_id: "pat-1".to_string(),
1866            target_type: "Skill".to_string(),
1867            target_path: "/path".to_string(),
1868            content: "content".to_string(),
1869            applied_at: Utc::now(),
1870            pr_url: Some("https://github.com/test/pull/1".to_string()),
1871            status: ProjectionStatus::Applied,
1872        };
1873        insert_projection(&conn, &proj).unwrap();
1874
1875        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1876    }
1877
1878    #[test]
1879    fn test_has_unprojected_patterns_excludes_generation_failed() {
1880        let conn = test_db();
1881
1882        let pattern = test_pattern("pat-1", "Use uv for Python");
1883        insert_pattern(&conn, &pattern).unwrap();
1884        set_generation_failed(&conn, "pat-1", true).unwrap();
1885
1886        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1887    }
1888
1889    #[test]
1890    fn test_has_unprojected_patterns_excludes_dbonly() {
1891        let conn = test_db();
1892
1893        let mut pattern = test_pattern("pat-1", "Internal tracking only");
1894        pattern.suggested_target = SuggestedTarget::DbOnly;
1895        insert_pattern(&conn, &pattern).unwrap();
1896
1897        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1898    }
1899
1900    #[test]
1901    fn test_auto_apply_data_triggers_full_flow() {
1902        let conn = test_db();
1903
1904        // Initially: no data, no triggers
1905        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1906        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1907
1908        // Step 1: Ingest creates sessions → triggers analyze
1909        let session = IngestedSession {
1910            session_id: "sess-1".to_string(),
1911            project: "/proj".to_string(),
1912            session_path: "/path/sess".to_string(),
1913            file_size: 100,
1914            file_mtime: "2025-01-01T00:00:00Z".to_string(),
1915            ingested_at: Utc::now(),
1916        };
1917        record_ingested_session(&conn, &session).unwrap();
1918        assert!(has_unanalyzed_sessions(&conn).unwrap());
1919
1920        // Step 2: After analysis → sessions marked, patterns created → triggers apply
1921        record_analyzed_session(&conn, "sess-1", "/proj").unwrap();
1922        assert!(!has_unanalyzed_sessions(&conn).unwrap());
1923
1924        let p = test_pattern("pat-1", "Always use cargo fmt");
1925        insert_pattern(&conn, &p).unwrap();
1926        assert!(has_unprojected_patterns(&conn, 0.0).unwrap());
1927
1928        // Step 3: After apply → projection created with PR URL
1929        let proj = Projection {
1930            id: "proj-1".to_string(),
1931            pattern_id: "pat-1".to_string(),
1932            target_type: "Skill".to_string(),
1933            target_path: "/skills/cargo-fmt.md".to_string(),
1934            content: "skill content".to_string(),
1935            applied_at: Utc::now(),
1936            pr_url: Some("https://github.com/test/pull/42".to_string()),
1937            status: ProjectionStatus::Applied,
1938        };
1939        insert_projection(&conn, &proj).unwrap();
1940        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
1941    }
1942
1943    #[test]
1944    fn test_get_last_nudge_at_empty() {
1945        let conn = test_db();
1946        assert!(get_last_nudge_at(&conn).unwrap().is_none());
1947    }
1948
1949    #[test]
1950    fn test_unanalyzed_session_count() {
1951        let conn = test_db();
1952        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 0);
1953
1954        // Add 3 sessions
1955        for i in 1..=3 {
1956            let session = IngestedSession {
1957                session_id: format!("sess-{i}"),
1958                project: "/proj".to_string(),
1959                session_path: format!("/path/sess-{i}"),
1960                file_size: 100,
1961                file_mtime: "2025-01-01T00:00:00Z".to_string(),
1962                ingested_at: Utc::now(),
1963            };
1964            record_ingested_session(&conn, &session).unwrap();
1965        }
1966        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 3);
1967
1968        // Analyze one
1969        record_analyzed_session(&conn, "sess-1", "/proj").unwrap();
1970        assert_eq!(unanalyzed_session_count(&conn).unwrap(), 2);
1971    }
1972
1973    #[test]
1974    fn test_set_and_get_last_nudge_at() {
1975        let conn = test_db();
1976        let now = Utc::now();
1977        set_last_nudge_at(&conn, &now).unwrap();
1978        let result = get_last_nudge_at(&conn).unwrap().unwrap();
1979        // Compare to second precision (DB stores RFC 3339)
1980        assert_eq!(
1981            result.format("%Y-%m-%dT%H:%M:%S").to_string(),
1982            now.format("%Y-%m-%dT%H:%M:%S").to_string()
1983        );
1984    }
1985
1986    #[test]
1987    fn test_projection_status_column_exists() {
1988        let conn = test_db();
1989        let pattern = test_pattern("pat-1", "Test");
1990        insert_pattern(&conn, &pattern).unwrap();
1991
1992        let proj = Projection {
1993            id: "proj-1".to_string(),
1994            pattern_id: "pat-1".to_string(),
1995            target_type: "skill".to_string(),
1996            target_path: "/test/skill.md".to_string(),
1997            content: "content".to_string(),
1998            applied_at: Utc::now(),
1999            pr_url: None,
2000            status: ProjectionStatus::PendingReview,
2001        };
2002        insert_projection(&conn, &proj).unwrap();
2003
2004        let status: String = conn
2005            .query_row(
2006                "SELECT status FROM projections WHERE id = 'proj-1'",
2007                [],
2008                |row| row.get(0),
2009            )
2010            .unwrap();
2011        assert_eq!(status, "pending_review");
2012    }
2013
2014    #[test]
2015    fn test_existing_projections_default_to_applied() {
2016        // Simulate a v2 database with existing projections
2017        let conn = Connection::open_in_memory().unwrap();
2018        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2019
2020        // Create v1 schema manually
2021        conn.execute_batch(
2022            "CREATE TABLE patterns (
2023                id TEXT PRIMARY KEY, pattern_type TEXT NOT NULL, description TEXT NOT NULL,
2024                confidence REAL NOT NULL, times_seen INTEGER NOT NULL DEFAULT 1,
2025                first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, last_projected TEXT,
2026                status TEXT NOT NULL DEFAULT 'discovered', source_sessions TEXT NOT NULL,
2027                related_files TEXT NOT NULL, suggested_content TEXT NOT NULL,
2028                suggested_target TEXT NOT NULL, project TEXT,
2029                generation_failed INTEGER NOT NULL DEFAULT 0
2030            );
2031            CREATE TABLE projections (
2032                id TEXT PRIMARY KEY, pattern_id TEXT NOT NULL REFERENCES patterns(id),
2033                target_type TEXT NOT NULL, target_path TEXT NOT NULL, content TEXT NOT NULL,
2034                applied_at TEXT NOT NULL, pr_url TEXT, nudged INTEGER NOT NULL DEFAULT 0
2035            );
2036            CREATE TABLE analyzed_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, analyzed_at TEXT NOT NULL);
2037            CREATE TABLE ingested_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, session_path TEXT NOT NULL, file_size INTEGER NOT NULL, file_mtime TEXT NOT NULL, ingested_at TEXT NOT NULL);
2038            PRAGMA user_version = 1;",
2039        ).unwrap();
2040
2041        // Insert a pattern first (FK target)
2042        conn.execute(
2043            "INSERT INTO patterns (id, pattern_type, description, confidence, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target)
2044             VALUES ('pat-1', 'workflow_pattern', 'Test', 0.8, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z', 'discovered', '[]', '[]', 'content', 'skill')",
2045            [],
2046        ).unwrap();
2047
2048        // Insert an old-style projection (no status column)
2049        conn.execute(
2050            "INSERT INTO projections (id, pattern_id, target_type, target_path, content, applied_at)
2051             VALUES ('proj-old', 'pat-1', 'skill', '/path', 'content', '2026-01-01T00:00:00Z')",
2052            [],
2053        ).unwrap();
2054
2055        // Now run migration (open_db equivalent)
2056        migrate(&conn).unwrap();
2057
2058        // Old projection should have status = 'applied'
2059        let status: String = conn
2060            .query_row("SELECT status FROM projections WHERE id = 'proj-old'", [], |row| row.get(0))
2061            .unwrap();
2062        assert_eq!(status, "applied");
2063    }
2064
2065    #[test]
2066    fn test_get_pending_review_projections() {
2067        let conn = test_db();
2068        let p1 = test_pattern("pat-1", "Pattern one");
2069        let p2 = test_pattern("pat-2", "Pattern two");
2070        insert_pattern(&conn, &p1).unwrap();
2071        insert_pattern(&conn, &p2).unwrap();
2072
2073        // One pending, one applied
2074        let proj1 = Projection {
2075            id: "proj-1".to_string(),
2076            pattern_id: "pat-1".to_string(),
2077            target_type: "skill".to_string(),
2078            target_path: "/test/a.md".to_string(),
2079            content: "content a".to_string(),
2080            applied_at: Utc::now(),
2081            pr_url: None,
2082            status: ProjectionStatus::PendingReview,
2083        };
2084        let proj2 = Projection {
2085            id: "proj-2".to_string(),
2086            pattern_id: "pat-2".to_string(),
2087            target_type: "skill".to_string(),
2088            target_path: "/test/b.md".to_string(),
2089            content: "content b".to_string(),
2090            applied_at: Utc::now(),
2091            pr_url: None,
2092            status: ProjectionStatus::Applied,
2093        };
2094        insert_projection(&conn, &proj1).unwrap();
2095        insert_projection(&conn, &proj2).unwrap();
2096
2097        let pending = get_pending_review_projections(&conn).unwrap();
2098        assert_eq!(pending.len(), 1);
2099        assert_eq!(pending[0].id, "proj-1");
2100    }
2101
2102    #[test]
2103    fn test_update_projection_status() {
2104        let conn = test_db();
2105        let p = test_pattern("pat-1", "Pattern");
2106        insert_pattern(&conn, &p).unwrap();
2107
2108        let proj = Projection {
2109            id: "proj-1".to_string(),
2110            pattern_id: "pat-1".to_string(),
2111            target_type: "skill".to_string(),
2112            target_path: "/test.md".to_string(),
2113            content: "content".to_string(),
2114            applied_at: Utc::now(),
2115            pr_url: None,
2116            status: ProjectionStatus::PendingReview,
2117        };
2118        insert_projection(&conn, &proj).unwrap();
2119
2120        update_projection_status(&conn, "proj-1", &ProjectionStatus::Applied).unwrap();
2121
2122        let status: String = conn
2123            .query_row("SELECT status FROM projections WHERE id = 'proj-1'", [], |row| row.get(0))
2124            .unwrap();
2125        assert_eq!(status, "applied");
2126    }
2127
2128    #[test]
2129    fn test_delete_projection() {
2130        let conn = test_db();
2131        let p = test_pattern("pat-1", "Pattern");
2132        insert_pattern(&conn, &p).unwrap();
2133
2134        let proj = Projection {
2135            id: "proj-1".to_string(),
2136            pattern_id: "pat-1".to_string(),
2137            target_type: "skill".to_string(),
2138            target_path: "/test.md".to_string(),
2139            content: "content".to_string(),
2140            applied_at: Utc::now(),
2141            pr_url: None,
2142            status: ProjectionStatus::PendingReview,
2143        };
2144        insert_projection(&conn, &proj).unwrap();
2145        assert!(has_projection_for_pattern(&conn, "pat-1").unwrap());
2146
2147        delete_projection(&conn, "proj-1").unwrap();
2148        assert!(!has_projection_for_pattern(&conn, "pat-1").unwrap());
2149    }
2150
2151    #[test]
2152    fn test_get_projections_with_pr_url() {
2153        let conn = test_db();
2154        let p1 = test_pattern("pat-1", "Pattern one");
2155        let p2 = test_pattern("pat-2", "Pattern two");
2156        insert_pattern(&conn, &p1).unwrap();
2157        insert_pattern(&conn, &p2).unwrap();
2158
2159        let proj1 = Projection {
2160            id: "proj-1".to_string(),
2161            pattern_id: "pat-1".to_string(),
2162            target_type: "skill".to_string(),
2163            target_path: "/a.md".to_string(),
2164            content: "a".to_string(),
2165            applied_at: Utc::now(),
2166            pr_url: Some("https://github.com/test/pull/1".to_string()),
2167            status: ProjectionStatus::Applied,
2168        };
2169        let proj2 = Projection {
2170            id: "proj-2".to_string(),
2171            pattern_id: "pat-2".to_string(),
2172            target_type: "skill".to_string(),
2173            target_path: "/b.md".to_string(),
2174            content: "b".to_string(),
2175            applied_at: Utc::now(),
2176            pr_url: None,
2177            status: ProjectionStatus::Applied,
2178        };
2179        insert_projection(&conn, &proj1).unwrap();
2180        insert_projection(&conn, &proj2).unwrap();
2181
2182        let with_pr = get_applied_projections_with_pr(&conn).unwrap();
2183        assert_eq!(with_pr.len(), 1);
2184        assert_eq!(with_pr[0].pr_url, Some("https://github.com/test/pull/1".to_string()));
2185    }
2186
2187    #[test]
2188    fn test_get_projected_pattern_ids_by_status() {
2189        let conn = test_db();
2190        let p1 = test_pattern("pat-1", "Pattern one");
2191        let p2 = test_pattern("pat-2", "Pattern two");
2192        insert_pattern(&conn, &p1).unwrap();
2193        insert_pattern(&conn, &p2).unwrap();
2194
2195        let proj1 = Projection {
2196            id: "proj-1".to_string(),
2197            pattern_id: "pat-1".to_string(),
2198            target_type: "skill".to_string(),
2199            target_path: "/a.md".to_string(),
2200            content: "a".to_string(),
2201            applied_at: Utc::now(),
2202            pr_url: None,
2203            status: ProjectionStatus::Applied,
2204        };
2205        let proj2 = Projection {
2206            id: "proj-2".to_string(),
2207            pattern_id: "pat-2".to_string(),
2208            target_type: "skill".to_string(),
2209            target_path: "/b.md".to_string(),
2210            content: "b".to_string(),
2211            applied_at: Utc::now(),
2212            pr_url: None,
2213            status: ProjectionStatus::PendingReview,
2214        };
2215        insert_projection(&conn, &proj1).unwrap();
2216        insert_projection(&conn, &proj2).unwrap();
2217
2218        let ids = get_projected_pattern_ids_by_status(&conn, &[ProjectionStatus::Applied, ProjectionStatus::PendingReview]).unwrap();
2219        assert_eq!(ids.len(), 2);
2220
2221        let ids_applied_only = get_projected_pattern_ids_by_status(&conn, &[ProjectionStatus::Applied]).unwrap();
2222        assert_eq!(ids_applied_only.len(), 1);
2223        assert!(ids_applied_only.contains("pat-1"));
2224    }
2225
2226    #[test]
2227    fn test_has_unprojected_patterns_excludes_dismissed() {
2228        let conn = test_db();
2229
2230        let mut pattern = test_pattern("pat-1", "Dismissed pattern");
2231        pattern.status = PatternStatus::Dismissed;
2232        insert_pattern(&conn, &pattern).unwrap();
2233
2234        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
2235    }
2236
2237    #[test]
2238    fn test_has_unprojected_patterns_excludes_pending_review() {
2239        let conn = test_db();
2240
2241        let pattern = test_pattern("pat-1", "Pattern with pending review");
2242        insert_pattern(&conn, &pattern).unwrap();
2243
2244        // Create a pending_review projection
2245        let proj = Projection {
2246            id: "proj-1".to_string(),
2247            pattern_id: "pat-1".to_string(),
2248            target_type: "skill".to_string(),
2249            target_path: "/test.md".to_string(),
2250            content: "content".to_string(),
2251            applied_at: Utc::now(),
2252            pr_url: None,
2253            status: ProjectionStatus::PendingReview,
2254        };
2255        insert_projection(&conn, &proj).unwrap();
2256
2257        // Pattern already has a pending_review projection — should NOT be "unprojected"
2258        assert!(!has_unprojected_patterns(&conn, 0.0).unwrap());
2259    }
2260
2261    #[test]
2262    fn test_v4_migration_creates_tables() {
2263        let conn = Connection::open_in_memory().unwrap();
2264        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2265        migrate(&conn).unwrap();
2266
2267        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2268        assert_eq!(version, 5);
2269
2270        // Verify nodes table exists with correct columns
2271        let count: i64 = conn.query_row(
2272            "SELECT COUNT(*) FROM nodes WHERE 1=0", [], |row| row.get(0)
2273        ).unwrap();
2274        assert_eq!(count, 0);
2275
2276        // Verify edges table exists
2277        let count: i64 = conn.query_row(
2278            "SELECT COUNT(*) FROM edges WHERE 1=0", [], |row| row.get(0)
2279        ).unwrap();
2280        assert_eq!(count, 0);
2281
2282        // Verify projects table exists
2283        let count: i64 = conn.query_row(
2284            "SELECT COUNT(*) FROM projects WHERE 1=0", [], |row| row.get(0)
2285        ).unwrap();
2286        assert_eq!(count, 0);
2287    }
2288
2289    // ── Node CRUD tests ──
2290
2291    #[test]
2292    fn test_insert_and_get_node() {
2293        let conn = Connection::open_in_memory().unwrap();
2294        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2295        migrate(&conn).unwrap();
2296
2297        let node = KnowledgeNode {
2298            id: "node-1".to_string(),
2299            node_type: NodeType::Rule,
2300            scope: NodeScope::Project,
2301            project_id: Some("my-app".to_string()),
2302            content: "Always run tests".to_string(),
2303            confidence: 0.85,
2304            status: NodeStatus::Active,
2305            created_at: Utc::now(),
2306            updated_at: Utc::now(),
2307            projected_at: None,
2308            pr_url: None,
2309        };
2310
2311        insert_node(&conn, &node).unwrap();
2312        let retrieved = get_node(&conn, "node-1").unwrap().unwrap();
2313        assert_eq!(retrieved.content, "Always run tests");
2314        assert_eq!(retrieved.node_type, NodeType::Rule);
2315        assert_eq!(retrieved.scope, NodeScope::Project);
2316        assert_eq!(retrieved.confidence, 0.85);
2317    }
2318
2319    #[test]
2320    fn test_get_nodes_by_scope_and_status() {
2321        let conn = Connection::open_in_memory().unwrap();
2322        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2323        migrate(&conn).unwrap();
2324
2325        let now = Utc::now();
2326        for (i, scope) in [NodeScope::Global, NodeScope::Project, NodeScope::Global].iter().enumerate() {
2327            let node = KnowledgeNode {
2328                id: format!("node-{i}"),
2329                node_type: NodeType::Rule,
2330                scope: scope.clone(),
2331                project_id: if *scope == NodeScope::Project { Some("my-app".to_string()) } else { None },
2332                content: format!("Rule {i}"),
2333                confidence: 0.8,
2334                status: NodeStatus::Active,
2335                created_at: now,
2336                updated_at: now,
2337                projected_at: None,
2338                pr_url: None,
2339            };
2340            insert_node(&conn, &node).unwrap();
2341        }
2342
2343        let global_nodes = get_nodes_by_scope(&conn, &NodeScope::Global, None, &[NodeStatus::Active]).unwrap();
2344        assert_eq!(global_nodes.len(), 2);
2345
2346        let project_nodes = get_nodes_by_scope(&conn, &NodeScope::Project, Some("my-app"), &[NodeStatus::Active]).unwrap();
2347        assert_eq!(project_nodes.len(), 1);
2348    }
2349
2350    #[test]
2351    fn test_update_node_confidence() {
2352        let conn = Connection::open_in_memory().unwrap();
2353        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2354        migrate(&conn).unwrap();
2355
2356        let node = KnowledgeNode {
2357            id: "node-1".to_string(),
2358            node_type: NodeType::Pattern,
2359            scope: NodeScope::Project,
2360            project_id: Some("my-app".to_string()),
2361            content: "Forgets tests".to_string(),
2362            confidence: 0.5,
2363            status: NodeStatus::Active,
2364            created_at: Utc::now(),
2365            updated_at: Utc::now(),
2366            projected_at: None,
2367            pr_url: None,
2368        };
2369        insert_node(&conn, &node).unwrap();
2370
2371        update_node_confidence(&conn, "node-1", 0.75).unwrap();
2372        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2373        assert_eq!(updated.confidence, 0.75);
2374    }
2375
2376    #[test]
2377    fn test_update_node_status() {
2378        let conn = Connection::open_in_memory().unwrap();
2379        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2380        migrate(&conn).unwrap();
2381
2382        let node = KnowledgeNode {
2383            id: "node-1".to_string(),
2384            node_type: NodeType::Rule,
2385            scope: NodeScope::Global,
2386            project_id: None,
2387            content: "Use snake_case".to_string(),
2388            confidence: 0.9,
2389            status: NodeStatus::PendingReview,
2390            created_at: Utc::now(),
2391            updated_at: Utc::now(),
2392            projected_at: None,
2393            pr_url: None,
2394        };
2395        insert_node(&conn, &node).unwrap();
2396
2397        update_node_status(&conn, "node-1", &NodeStatus::Active).unwrap();
2398        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2399        assert_eq!(updated.status, NodeStatus::Active);
2400    }
2401
2402    #[test]
2403    fn test_v4_migration_from_v3() {
2404        let conn = Connection::open_in_memory().unwrap();
2405        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2406
2407        // Manually create a v3-state database (v1+v2+v3 tables, user_version=3)
2408        conn.execute_batch("
2409            CREATE TABLE patterns (id TEXT PRIMARY KEY, pattern_type TEXT NOT NULL, description TEXT NOT NULL, confidence REAL NOT NULL, times_seen INTEGER NOT NULL DEFAULT 1, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, last_projected TEXT, status TEXT NOT NULL DEFAULT 'discovered', source_sessions TEXT NOT NULL, related_files TEXT NOT NULL, suggested_content TEXT NOT NULL, suggested_target TEXT NOT NULL, project TEXT, generation_failed INTEGER NOT NULL DEFAULT 0);
2410            CREATE TABLE projections (id TEXT PRIMARY KEY, pattern_id TEXT NOT NULL, target_type TEXT NOT NULL, target_path TEXT NOT NULL, content TEXT NOT NULL, applied_at TEXT NOT NULL, pr_url TEXT, nudged INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'applied');
2411            CREATE TABLE analyzed_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, analyzed_at TEXT NOT NULL);
2412            CREATE TABLE ingested_sessions (session_id TEXT PRIMARY KEY, project TEXT NOT NULL, session_path TEXT NOT NULL, file_size INTEGER NOT NULL, file_mtime TEXT NOT NULL, ingested_at TEXT NOT NULL);
2413            CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT NOT NULL);
2414        ").unwrap();
2415        conn.pragma_update(None, "user_version", 3).unwrap();
2416
2417        // Now run migrate — should only add v4 tables
2418        migrate(&conn).unwrap();
2419
2420        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2421        assert_eq!(version, 5);
2422
2423        // v4 tables exist
2424        conn.query_row("SELECT COUNT(*) FROM nodes WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2425        conn.query_row("SELECT COUNT(*) FROM edges WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2426        conn.query_row("SELECT COUNT(*) FROM projects WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2427
2428        // Old tables still exist
2429        conn.query_row("SELECT COUNT(*) FROM patterns WHERE 1=0", [], |row| row.get::<_, i64>(0)).unwrap();
2430    }
2431
2432    // ── Edge CRUD tests ──
2433
2434    #[test]
2435    fn test_insert_and_get_edges() {
2436        let conn = Connection::open_in_memory().unwrap();
2437        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2438        migrate(&conn).unwrap();
2439
2440        let now = Utc::now();
2441        let node1 = KnowledgeNode {
2442            id: "node-1".to_string(), node_type: NodeType::Pattern,
2443            scope: NodeScope::Project, project_id: Some("app".to_string()),
2444            content: "Pattern A".to_string(), confidence: 0.5,
2445            status: NodeStatus::Active, created_at: now, updated_at: now,
2446            projected_at: None, pr_url: None,
2447        };
2448        let node2 = KnowledgeNode {
2449            id: "node-2".to_string(), node_type: NodeType::Rule,
2450            scope: NodeScope::Project, project_id: Some("app".to_string()),
2451            content: "Rule B".to_string(), confidence: 0.8,
2452            status: NodeStatus::Active, created_at: now, updated_at: now,
2453            projected_at: None, pr_url: None,
2454        };
2455        insert_node(&conn, &node1).unwrap();
2456        insert_node(&conn, &node2).unwrap();
2457
2458        let edge = KnowledgeEdge {
2459            source_id: "node-1".to_string(),
2460            target_id: "node-2".to_string(),
2461            edge_type: EdgeType::DerivedFrom,
2462            created_at: now,
2463        };
2464        insert_edge(&conn, &edge).unwrap();
2465
2466        let edges = get_edges_from(&conn, "node-1").unwrap();
2467        assert_eq!(edges.len(), 1);
2468        assert_eq!(edges[0].target_id, "node-2");
2469        assert_eq!(edges[0].edge_type, EdgeType::DerivedFrom);
2470
2471        let edges_to = get_edges_to(&conn, "node-2").unwrap();
2472        assert_eq!(edges_to.len(), 1);
2473        assert_eq!(edges_to[0].source_id, "node-1");
2474    }
2475
2476    #[test]
2477    fn test_supersede_node_archives_old() {
2478        let conn = Connection::open_in_memory().unwrap();
2479        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2480        migrate(&conn).unwrap();
2481
2482        let now = Utc::now();
2483        let old_node = KnowledgeNode {
2484            id: "old".to_string(), node_type: NodeType::Rule,
2485            scope: NodeScope::Global, project_id: None,
2486            content: "Old rule".to_string(), confidence: 0.8,
2487            status: NodeStatus::Active, created_at: now, updated_at: now,
2488            projected_at: None, pr_url: None,
2489        };
2490        let new_node = KnowledgeNode {
2491            id: "new".to_string(), node_type: NodeType::Rule,
2492            scope: NodeScope::Global, project_id: None,
2493            content: "New rule".to_string(), confidence: 0.85,
2494            status: NodeStatus::Active, created_at: now, updated_at: now,
2495            projected_at: None, pr_url: None,
2496        };
2497        insert_node(&conn, &old_node).unwrap();
2498        insert_node(&conn, &new_node).unwrap();
2499
2500        supersede_node(&conn, "new", "old").unwrap();
2501
2502        let old = get_node(&conn, "old").unwrap().unwrap();
2503        assert_eq!(old.status, NodeStatus::Archived);
2504
2505        let edges = get_edges_from(&conn, "new").unwrap();
2506        assert_eq!(edges.len(), 1);
2507        assert_eq!(edges[0].edge_type, EdgeType::Supersedes);
2508        assert_eq!(edges[0].target_id, "old");
2509    }
2510
2511    // ── Project CRUD tests ──
2512
2513    #[test]
2514    fn test_upsert_and_get_project() {
2515        let conn = Connection::open_in_memory().unwrap();
2516        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2517        migrate(&conn).unwrap();
2518
2519        let project = KnowledgeProject {
2520            id: "my-app".to_string(),
2521            path: "/home/user/my-app".to_string(),
2522            remote_url: Some("git@github.com:user/my-app.git".to_string()),
2523            agent_type: "claude_code".to_string(),
2524            last_seen: Utc::now(),
2525        };
2526        upsert_project(&conn, &project).unwrap();
2527
2528        let retrieved = get_project(&conn, "my-app").unwrap().unwrap();
2529        assert_eq!(retrieved.path, "/home/user/my-app");
2530        assert_eq!(retrieved.remote_url.unwrap(), "git@github.com:user/my-app.git");
2531    }
2532
2533    #[test]
2534    fn test_get_project_by_remote_url() {
2535        let conn = Connection::open_in_memory().unwrap();
2536        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2537        migrate(&conn).unwrap();
2538
2539        let project = KnowledgeProject {
2540            id: "my-app".to_string(),
2541            path: "/old/path".to_string(),
2542            remote_url: Some("git@github.com:user/my-app.git".to_string()),
2543            agent_type: "claude_code".to_string(),
2544            last_seen: Utc::now(),
2545        };
2546        upsert_project(&conn, &project).unwrap();
2547
2548        let found = get_project_by_remote_url(&conn, "git@github.com:user/my-app.git").unwrap();
2549        assert!(found.is_some());
2550        assert_eq!(found.unwrap().id, "my-app");
2551    }
2552
2553    #[test]
2554    fn test_get_all_projects() {
2555        let conn = Connection::open_in_memory().unwrap();
2556        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2557        migrate(&conn).unwrap();
2558
2559        for name in ["app-1", "app-2"] {
2560            let project = KnowledgeProject {
2561                id: name.to_string(),
2562                path: format!("/home/{name}"),
2563                remote_url: None,
2564                agent_type: "claude_code".to_string(),
2565                last_seen: Utc::now(),
2566            };
2567            upsert_project(&conn, &project).unwrap();
2568        }
2569
2570        let projects = get_all_projects(&conn).unwrap();
2571        assert_eq!(projects.len(), 2);
2572    }
2573
2574    #[test]
2575    fn test_generate_project_slug() {
2576        assert_eq!(generate_project_slug("/home/user/my-rust-app"), "my-rust-app");
2577        assert_eq!(generate_project_slug("/home/user/My App"), "my-app");
2578        assert_eq!(generate_project_slug("/"), "unnamed-project");
2579    }
2580
2581    #[test]
2582    fn test_migrate_patterns_to_nodes() {
2583        let conn = Connection::open_in_memory().unwrap();
2584        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2585        migrate(&conn).unwrap();
2586
2587        // Insert v1 patterns
2588        let now = Utc::now().to_rfc3339();
2589        conn.execute(
2590            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2591             VALUES (?1, ?2, ?3, ?4, 2, ?5, ?5, 'active', '[]', '[]', 'content', ?6, ?7, 0)",
2592            params!["p1", "repetitive_instruction", "Always run tests", 0.85, &now, "claude_md", "my-app"],
2593        ).unwrap();
2594        conn.execute(
2595            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2596             VALUES (?1, ?2, ?3, ?4, 1, ?5, ?5, 'discovered', '[]', '[]', 'content', ?6, ?7, 0)",
2597            params!["p2", "recurring_mistake", "Forgets imports", 0.6, &now, "skill", "my-app"],
2598        ).unwrap();
2599        conn.execute(
2600            "INSERT INTO patterns (id, pattern_type, description, confidence, times_seen, first_seen, last_seen, status, source_sessions, related_files, suggested_content, suggested_target, project, generation_failed)
2601             VALUES (?1, ?2, ?3, ?4, 3, ?5, ?5, 'active', '[]', '[]', 'Always use snake_case', ?6, ?7, 0)",
2602            params!["p3", "repetitive_instruction", "Always use snake_case", 0.9, &now, "claude_md", "my-app"],
2603        ).unwrap();
2604
2605        let count = migrate_patterns_to_nodes(&conn).unwrap();
2606        assert_eq!(count, 3);
2607
2608        // p1: RepetitiveInstruction + ClaudeMd -> rule
2609        let node1 = get_node(&conn, "migrated-p1").unwrap().unwrap();
2610        assert_eq!(node1.node_type, NodeType::Rule);
2611        assert_eq!(node1.scope, NodeScope::Project);
2612
2613        // p2: RecurringMistake -> pattern
2614        let node2 = get_node(&conn, "migrated-p2").unwrap().unwrap();
2615        assert_eq!(node2.node_type, NodeType::Pattern);
2616
2617        // p3: confidence >= 0.85 + "always" in content -> directive (override)
2618        let node3 = get_node(&conn, "migrated-p3").unwrap().unwrap();
2619        assert_eq!(node3.node_type, NodeType::Directive);
2620    }
2621
2622    #[test]
2623    fn test_apply_graph_operations() {
2624        let conn = Connection::open_in_memory().unwrap();
2625        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2626        migrate(&conn).unwrap();
2627
2628        let ops = vec![
2629            GraphOperation::CreateNode {
2630                node_type: NodeType::Rule,
2631                scope: NodeScope::Project,
2632                project_id: Some("my-app".to_string()),
2633                content: "Always run tests".to_string(),
2634                confidence: 0.85,
2635            },
2636            GraphOperation::CreateNode {
2637                node_type: NodeType::Pattern,
2638                scope: NodeScope::Global,
2639                project_id: None,
2640                content: "Prefers TDD".to_string(),
2641                confidence: 0.6,
2642            },
2643        ];
2644
2645        let result = apply_graph_operations(&conn, &ops).unwrap();
2646        assert_eq!(result.nodes_created, 2);
2647
2648        let nodes = get_nodes_by_scope(&conn, &NodeScope::Project, Some("my-app"), &[NodeStatus::Active]).unwrap();
2649        assert_eq!(nodes.len(), 1);
2650        assert_eq!(nodes[0].content, "Always run tests");
2651    }
2652
2653    #[test]
2654    fn test_apply_graph_operations_update() {
2655        let conn = Connection::open_in_memory().unwrap();
2656        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2657        migrate(&conn).unwrap();
2658
2659        let node = KnowledgeNode {
2660            id: "node-1".to_string(),
2661            node_type: NodeType::Pattern,
2662            scope: NodeScope::Project,
2663            project_id: Some("app".to_string()),
2664            content: "Old content".to_string(),
2665            confidence: 0.5,
2666            status: NodeStatus::Active,
2667            created_at: Utc::now(),
2668            updated_at: Utc::now(),
2669            projected_at: None,
2670            pr_url: None,
2671        };
2672        insert_node(&conn, &node).unwrap();
2673
2674        let ops = vec![
2675            GraphOperation::UpdateNode {
2676                id: "node-1".to_string(),
2677                confidence: Some(0.8),
2678                content: Some("Updated content".to_string()),
2679            },
2680        ];
2681
2682        let result = apply_graph_operations(&conn, &ops).unwrap();
2683        assert_eq!(result.nodes_updated, 1);
2684
2685        let updated = get_node(&conn, "node-1").unwrap().unwrap();
2686        assert_eq!(updated.confidence, 0.8);
2687        assert_eq!(updated.content, "Updated content");
2688    }
2689
2690    #[test]
2691    fn test_get_nodes_by_status() {
2692        let conn = test_db();
2693
2694        let active_node = KnowledgeNode {
2695            id: "n1".to_string(),
2696            node_type: NodeType::Rule,
2697            scope: NodeScope::Project,
2698            project_id: Some("my-app".to_string()),
2699            content: "Always run tests".to_string(),
2700            confidence: 0.85,
2701            status: NodeStatus::Active,
2702            created_at: Utc::now(),
2703            updated_at: Utc::now(),
2704            projected_at: None,
2705            pr_url: None,
2706        };
2707        let pending_node = KnowledgeNode {
2708            id: "n2".to_string(),
2709            node_type: NodeType::Directive,
2710            scope: NodeScope::Global,
2711            project_id: None,
2712            content: "Use snake_case".to_string(),
2713            confidence: 0.9,
2714            status: NodeStatus::PendingReview,
2715            created_at: Utc::now(),
2716            updated_at: Utc::now(),
2717            projected_at: None,
2718            pr_url: None,
2719        };
2720        let dismissed_node = KnowledgeNode {
2721            id: "n3".to_string(),
2722            node_type: NodeType::Pattern,
2723            scope: NodeScope::Project,
2724            project_id: Some("my-app".to_string()),
2725            content: "Old pattern".to_string(),
2726            confidence: 0.5,
2727            status: NodeStatus::Dismissed,
2728            created_at: Utc::now(),
2729            updated_at: Utc::now(),
2730            projected_at: None,
2731            pr_url: None,
2732        };
2733
2734        insert_node(&conn, &active_node).unwrap();
2735        insert_node(&conn, &pending_node).unwrap();
2736        insert_node(&conn, &dismissed_node).unwrap();
2737
2738        let pending = get_nodes_by_status(&conn, &NodeStatus::PendingReview).unwrap();
2739        assert_eq!(pending.len(), 1);
2740        assert_eq!(pending[0].id, "n2");
2741
2742        let active = get_nodes_by_status(&conn, &NodeStatus::Active).unwrap();
2743        assert_eq!(active.len(), 1);
2744        assert_eq!(active[0].id, "n1");
2745
2746        let pending2 = KnowledgeNode {
2747            id: "n4".to_string(),
2748            node_type: NodeType::Rule,
2749            scope: NodeScope::Project,
2750            project_id: Some("other".to_string()),
2751            content: "Second pending".to_string(),
2752            confidence: 0.95,
2753            status: NodeStatus::PendingReview,
2754            created_at: Utc::now(),
2755            updated_at: Utc::now(),
2756            projected_at: None,
2757            pr_url: None,
2758        };
2759        insert_node(&conn, &pending2).unwrap();
2760        let pending_all = get_nodes_by_status(&conn, &NodeStatus::PendingReview).unwrap();
2761        assert_eq!(pending_all.len(), 2);
2762        assert_eq!(pending_all[0].id, "n4"); // Higher confidence first
2763    }
2764
2765    #[test]
2766    fn test_migrate_v4_to_v5_adds_projection_columns() {
2767        let conn = Connection::open_in_memory().unwrap();
2768        conn.pragma_update(None, "journal_mode", "WAL").unwrap();
2769        migrate(&conn).unwrap();
2770
2771        // Insert a node — should support new columns
2772        let node = KnowledgeNode {
2773            id: "test-1".to_string(),
2774            node_type: NodeType::Rule,
2775            scope: NodeScope::Global,
2776            project_id: None,
2777            content: "test rule".to_string(),
2778            confidence: 0.8,
2779            status: NodeStatus::Active,
2780            created_at: Utc::now(),
2781            updated_at: Utc::now(),
2782            projected_at: None,
2783            pr_url: None,
2784        };
2785        insert_node(&conn, &node).unwrap();
2786
2787        let retrieved = get_node(&conn, "test-1").unwrap().unwrap();
2788        assert!(retrieved.projected_at.is_none());
2789        assert!(retrieved.pr_url.is_none());
2790
2791        // Verify schema version
2792        let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0)).unwrap();
2793        assert_eq!(version, 5);
2794    }
2795
2796    fn test_node(id: &str, status: NodeStatus, projected_at: Option<String>, pr_url: Option<String>) -> KnowledgeNode {
2797        KnowledgeNode {
2798            id: id.to_string(),
2799            node_type: NodeType::Rule,
2800            scope: NodeScope::Global,
2801            project_id: None,
2802            content: format!("Content for {}", id),
2803            confidence: 0.8,
2804            status,
2805            created_at: Utc::now(),
2806            updated_at: Utc::now(),
2807            projected_at,
2808            pr_url,
2809        }
2810    }
2811
2812    #[test]
2813    fn test_get_unprojected_nodes() {
2814        let conn = test_db();
2815
2816        // Active node with no projected_at — should be returned
2817        let active_unprojected = test_node("n1", NodeStatus::Active, None, None);
2818        // Active node with projected_at set — should NOT be returned
2819        let active_projected = test_node("n2", NodeStatus::Active, Some(Utc::now().to_rfc3339()), None);
2820        // PendingReview node with no projected_at — should NOT be returned (wrong status)
2821        let pending = test_node("n3", NodeStatus::PendingReview, None, None);
2822
2823        insert_node(&conn, &active_unprojected).unwrap();
2824        insert_node(&conn, &active_projected).unwrap();
2825        insert_node(&conn, &pending).unwrap();
2826
2827        let nodes = get_unprojected_nodes(&conn).unwrap();
2828        assert_eq!(nodes.len(), 1);
2829        assert_eq!(nodes[0].id, "n1");
2830    }
2831
2832    #[test]
2833    fn test_mark_node_projected() {
2834        let conn = test_db();
2835
2836        let node = test_node("n1", NodeStatus::Active, None, None);
2837        insert_node(&conn, &node).unwrap();
2838
2839        mark_node_projected(&conn, "n1").unwrap();
2840
2841        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2842        assert!(retrieved.projected_at.is_some());
2843        assert!(retrieved.pr_url.is_none());
2844    }
2845
2846    #[test]
2847    fn test_mark_node_projected_with_pr() {
2848        let conn = test_db();
2849
2850        let node = test_node("n1", NodeStatus::Active, None, None);
2851        insert_node(&conn, &node).unwrap();
2852
2853        mark_node_projected_with_pr(&conn, "n1", "https://github.com/test/pull/42").unwrap();
2854
2855        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2856        assert!(retrieved.projected_at.is_some());
2857        assert_eq!(retrieved.pr_url, Some("https://github.com/test/pull/42".to_string()));
2858    }
2859
2860    #[test]
2861    fn test_dismiss_nodes_for_pr() {
2862        let conn = test_db();
2863
2864        let pr_url = "https://github.com/test/pull/99";
2865        let node1 = test_node("n1", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2866        let node2 = test_node("n2", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2867
2868        insert_node(&conn, &node1).unwrap();
2869        insert_node(&conn, &node2).unwrap();
2870
2871        dismiss_nodes_for_pr(&conn, pr_url).unwrap();
2872
2873        let n1 = get_node(&conn, "n1").unwrap().unwrap();
2874        let n2 = get_node(&conn, "n2").unwrap().unwrap();
2875
2876        assert_eq!(n1.status, NodeStatus::Dismissed);
2877        assert!(n1.pr_url.is_none());
2878        assert_eq!(n2.status, NodeStatus::Dismissed);
2879        assert!(n2.pr_url.is_none());
2880    }
2881
2882    #[test]
2883    fn test_clear_node_pr() {
2884        let conn = test_db();
2885
2886        let pr_url = "https://github.com/test/pull/7";
2887        let node = test_node("n1", NodeStatus::Active, Some(Utc::now().to_rfc3339()), Some(pr_url.to_string()));
2888        insert_node(&conn, &node).unwrap();
2889
2890        clear_node_pr(&conn, pr_url).unwrap();
2891
2892        let retrieved = get_node(&conn, "n1").unwrap().unwrap();
2893        assert!(retrieved.pr_url.is_none());
2894        assert_eq!(retrieved.status, NodeStatus::Active);
2895    }
2896}