Skip to main content

sc/storage/
sqlite.rs

1//! SQLite storage implementation.
2//!
3//! This module provides the main storage backend for SaveContext using SQLite.
4//! It follows the MutationContext pattern for transaction discipline and audit logging.
5
6use crate::error::{Error, Result};
7use crate::model::{Plan, PlanStatus, Project};
8use crate::storage::events::{insert_event, Event, EventType};
9use crate::storage::schema::apply_schema;
10use rusqlite::{Connection, OptionalExtension, Transaction};
11use std::collections::HashSet;
12use std::path::Path;
13use std::time::Duration;
14
15/// SQLite-based storage backend.
16#[derive(Debug)]
17pub struct SqliteStorage {
18    conn: Connection,
19}
20
21/// Context for a mutation operation, tracking side effects.
22///
23/// This struct is passed to mutation closures to:
24/// - Track which entities were modified (dirty tracking for sync)
25/// - Record audit events for history
26/// - Manage transaction state
27pub struct MutationContext {
28    /// Name of the operation being performed.
29    pub op_name: String,
30    /// Actor performing the operation (agent ID, user, etc.).
31    pub actor: String,
32    /// Events to write at the end of the transaction.
33    pub events: Vec<Event>,
34    /// IDs of entities marked dirty for sync export.
35    pub dirty_sessions: HashSet<String>,
36    pub dirty_issues: HashSet<String>,
37    pub dirty_items: HashSet<String>,
38    pub dirty_plans: HashSet<String>,
39}
40
41impl MutationContext {
42    /// Create a new mutation context.
43    #[must_use]
44    pub fn new(op_name: &str, actor: &str) -> Self {
45        Self {
46            op_name: op_name.to_string(),
47            actor: actor.to_string(),
48            events: Vec::new(),
49            dirty_sessions: HashSet::new(),
50            dirty_issues: HashSet::new(),
51            dirty_items: HashSet::new(),
52            dirty_plans: HashSet::new(),
53        }
54    }
55
56    /// Record an event for this operation.
57    pub fn record_event(
58        &mut self,
59        entity_type: &str,
60        entity_id: &str,
61        event_type: EventType,
62    ) {
63        self.events.push(Event::new(
64            entity_type,
65            entity_id,
66            event_type,
67            &self.actor,
68        ));
69    }
70
71    /// Record an event with old/new values for field tracking.
72    pub fn record_change(
73        &mut self,
74        entity_type: &str,
75        entity_id: &str,
76        event_type: EventType,
77        old_value: Option<String>,
78        new_value: Option<String>,
79    ) {
80        self.events.push(
81            Event::new(entity_type, entity_id, event_type, &self.actor)
82                .with_values(old_value, new_value),
83        );
84    }
85
86    /// Mark a session as dirty for sync export.
87    pub fn mark_session_dirty(&mut self, session_id: &str) {
88        self.dirty_sessions.insert(session_id.to_string());
89    }
90
91    /// Mark an issue as dirty for sync export.
92    pub fn mark_issue_dirty(&mut self, issue_id: &str) {
93        self.dirty_issues.insert(issue_id.to_string());
94    }
95
96    /// Mark a context item as dirty for sync export.
97    pub fn mark_item_dirty(&mut self, item_id: &str) {
98        self.dirty_items.insert(item_id.to_string());
99    }
100
101    /// Mark a plan as dirty for sync export.
102    pub fn mark_plan_dirty(&mut self, plan_id: &str) {
103        self.dirty_plans.insert(plan_id.to_string());
104    }
105}
106
107/// Statistics from backfilling dirty records for a project.
108///
109/// Returned by `backfill_dirty_for_project` to indicate how many records
110/// were marked dirty for sync export.
111#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
112pub struct BackfillStats {
113    /// Number of sessions marked dirty.
114    pub sessions: usize,
115    /// Number of issues marked dirty.
116    pub issues: usize,
117    /// Number of context items marked dirty.
118    pub context_items: usize,
119    /// Number of plans marked dirty.
120    pub plans: usize,
121}
122
123impl BackfillStats {
124    /// Returns true if any records were marked dirty.
125    #[must_use]
126    pub fn any(&self) -> bool {
127        self.sessions > 0 || self.issues > 0 || self.context_items > 0 || self.plans > 0
128    }
129
130    /// Returns total number of records marked dirty.
131    #[must_use]
132    pub fn total(&self) -> usize {
133        self.sessions + self.issues + self.context_items + self.plans
134    }
135}
136
137/// Counts of records for a project.
138///
139/// Used by `get_project_counts` to return summary statistics about
140/// a project's data.
141#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
142pub struct ProjectCounts {
143    /// Number of sessions.
144    pub sessions: usize,
145    /// Number of issues.
146    pub issues: usize,
147    /// Number of context items.
148    pub context_items: usize,
149    /// Number of memories.
150    pub memories: usize,
151    /// Number of checkpoints.
152    pub checkpoints: usize,
153}
154
155impl ProjectCounts {
156    /// Returns total number of records.
157    #[must_use]
158    pub fn total(&self) -> usize {
159        self.sessions + self.issues + self.context_items + self.memories + self.checkpoints
160    }
161}
162
163impl SqliteStorage {
164    /// Open a database at the given path.
165    ///
166    /// Creates the database and applies schema if it doesn't exist.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if the connection cannot be established or schema fails.
171    pub fn open(path: &Path) -> Result<Self> {
172        Self::open_with_timeout(path, None)
173    }
174
175    /// Open a database with an optional busy timeout.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the connection cannot be established or schema fails.
180    pub fn open_with_timeout(path: &Path, timeout_ms: Option<u64>) -> Result<Self> {
181        let conn = Connection::open(path)?;
182
183        if let Some(timeout) = timeout_ms {
184            conn.busy_timeout(Duration::from_millis(timeout))?;
185        } else {
186            // Default 5 second timeout
187            conn.busy_timeout(Duration::from_secs(5))?;
188        }
189
190        apply_schema(&conn)?;
191        Ok(Self { conn })
192    }
193
194    /// Open an in-memory database (for testing).
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the connection cannot be established.
199    pub fn open_memory() -> Result<Self> {
200        let conn = Connection::open_in_memory()?;
201        apply_schema(&conn)?;
202        Ok(Self { conn })
203    }
204
205    /// Get a reference to the underlying connection (for read operations).
206    #[must_use]
207    pub fn conn(&self) -> &Connection {
208        &self.conn
209    }
210
211    /// Execute a mutation with the transaction protocol.
212    ///
213    /// This method:
214    /// 1. Begins an IMMEDIATE transaction (for write locking)
215    /// 2. Executes the mutation closure
216    /// 3. Writes audit events
217    /// 4. Updates dirty tracking tables
218    /// 5. Commits (or rolls back on error)
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if any step fails. The transaction is rolled back on error.
223    pub fn mutate<F, R>(&mut self, op: &str, actor: &str, f: F) -> Result<R>
224    where
225        F: FnOnce(&Transaction, &mut MutationContext) -> Result<R>,
226    {
227        let tx = self
228            .conn
229            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
230
231        let mut ctx = MutationContext::new(op, actor);
232
233        // Execute the mutation
234        let result = f(&tx, &mut ctx)?;
235
236        // Write audit events
237        for event in &ctx.events {
238            insert_event(&tx, event)?;
239        }
240
241        // Dirty tracking is handled by triggers in schema.sql
242        // But we can explicitly mark items here if triggers miss something
243
244        // Commit
245        tx.commit()?;
246
247        Ok(result)
248    }
249
250    // ==================
251    // Session Operations
252    // ==================
253
254    /// Create a new session.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the insert fails.
259    pub fn create_session(
260        &mut self,
261        id: &str,
262        name: &str,
263        description: Option<&str>,
264        project_path: Option<&str>,
265        branch: Option<&str>,
266        actor: &str,
267    ) -> Result<()> {
268        let now = chrono::Utc::now().timestamp_millis();
269
270        self.mutate("create_session", actor, |tx, ctx| {
271            tx.execute(
272                "INSERT INTO sessions (id, name, description, project_path, branch, status, created_at, updated_at)
273                 VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?6)",
274                rusqlite::params![id, name, description, project_path, branch, now],
275            )?;
276
277            // Also insert into session_projects junction table for project-based filtering
278            if let Some(path) = project_path {
279                tx.execute(
280                    "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
281                    rusqlite::params![id, path, now],
282                )?;
283            }
284
285            ctx.record_event("session", id, EventType::SessionCreated);
286            ctx.mark_session_dirty(id);
287
288            Ok(())
289        })
290    }
291
292    /// Get a session by ID.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the query fails.
297    pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
298        let mut stmt = self.conn.prepare(
299            "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
300             FROM sessions WHERE id = ?1",
301        )?;
302
303        let session = stmt
304            .query_row([id], |row| {
305                Ok(Session {
306                    id: row.get(0)?,
307                    name: row.get(1)?,
308                    description: row.get(2)?,
309                    branch: row.get(3)?,
310                    channel: row.get(4)?,
311                    project_path: row.get(5)?,
312                    status: row.get(6)?,
313                    ended_at: row.get(7)?,
314                    created_at: row.get(8)?,
315                    updated_at: row.get(9)?,
316                })
317            })
318            .optional()?;
319
320        Ok(session)
321    }
322
323    /// List sessions with optional filters.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the query fails.
328    pub fn list_sessions(
329        &self,
330        project_path: Option<&str>,
331        status: Option<&str>,
332        limit: Option<u32>,
333    ) -> Result<Vec<Session>> {
334        self.list_sessions_with_search(project_path, status, limit, None)
335    }
336
337    /// List sessions with optional filters and search.
338    ///
339    /// Uses the `session_projects` junction table for project path filtering,
340    /// matching the MCP server's `listSessionsByPaths` behavior.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the query fails.
345    pub fn list_sessions_with_search(
346        &self,
347        project_path: Option<&str>,
348        status: Option<&str>,
349        limit: Option<u32>,
350        search: Option<&str>,
351    ) -> Result<Vec<Session>> {
352        let limit = limit.unwrap_or(50);
353
354        // Build dynamic SQL query using junction table for project filtering
355        let mut conditions: Vec<String> = Vec::new();
356        let mut params: Vec<String> = Vec::new();
357        let mut param_idx = 1;
358
359        // Determine if we need to join with session_projects
360        let (from_clause, select_distinct) = if let Some(path) = project_path {
361            // Join with session_projects to find sessions associated with this project
362            conditions.push(format!("sp.project_path = ?{param_idx}"));
363            params.push(path.to_string());
364            param_idx += 1;
365            (
366                "sessions s JOIN session_projects sp ON s.id = sp.session_id".to_string(),
367                "DISTINCT ",
368            )
369        } else {
370            // No project filter - query sessions directly
371            ("sessions s".to_string(), "")
372        };
373
374        if let Some(st) = status {
375            conditions.push(format!("s.status = ?{param_idx}"));
376            params.push(st.to_string());
377            param_idx += 1;
378        }
379
380        if let Some(search_term) = search {
381            // Case-insensitive search matching MCP server behavior
382            conditions.push(format!(
383                "(s.name LIKE ?{param_idx} COLLATE NOCASE OR s.description LIKE ?{param_idx} COLLATE NOCASE)"
384            ));
385            params.push(format!("%{search_term}%"));
386            param_idx += 1;
387        }
388
389        let where_clause = if conditions.is_empty() {
390            " WHERE 1=1".to_string()
391        } else {
392            format!(" WHERE {}", conditions.join(" AND "))
393        };
394
395        let sql = format!(
396            "SELECT {select_distinct}s.id, s.name, s.description, s.branch, s.channel, s.project_path, s.status, s.ended_at, s.created_at, s.updated_at
397             FROM {from_clause}{where_clause}
398             ORDER BY s.updated_at DESC LIMIT ?{param_idx}"
399        );
400        params.push(limit.to_string());
401
402        let mut stmt = self.conn.prepare(&sql)?;
403        let params_refs: Vec<&dyn rusqlite::ToSql> = params
404            .iter()
405            .map(|s| s as &dyn rusqlite::ToSql)
406            .collect();
407
408        let rows = stmt.query_map(params_refs.as_slice(), |row| {
409            Ok(Session {
410                id: row.get(0)?,
411                name: row.get(1)?,
412                description: row.get(2)?,
413                branch: row.get(3)?,
414                channel: row.get(4)?,
415                project_path: row.get(5)?,
416                status: row.get(6)?,
417                ended_at: row.get(7)?,
418                created_at: row.get(8)?,
419                updated_at: row.get(9)?,
420            })
421        })?;
422
423        rows.collect::<std::result::Result<Vec<_>, _>>()
424            .map_err(Error::from)
425    }
426
427    /// Update session status.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the update fails or session not found.
432    pub fn update_session_status(
433        &mut self,
434        id: &str,
435        status: &str,
436        actor: &str,
437    ) -> Result<()> {
438        let now = chrono::Utc::now().timestamp_millis();
439        let ended_at = if status == "completed" || status == "paused" {
440            Some(now)
441        } else {
442            None
443        };
444
445        self.mutate("update_session_status", actor, |tx, ctx| {
446            let rows = tx.execute(
447                "UPDATE sessions SET status = ?1, ended_at = ?2, updated_at = ?3 WHERE id = ?4",
448                rusqlite::params![status, ended_at, now, id],
449            )?;
450
451            if rows == 0 {
452                return Err(Error::SessionNotFound { id: id.to_string() });
453            }
454
455            let event_type = match status {
456                "paused" => EventType::SessionPaused,
457                "completed" => EventType::SessionCompleted,
458                _ => EventType::SessionUpdated,
459            };
460            ctx.record_event("session", id, event_type);
461            ctx.mark_session_dirty(id);
462
463            Ok(())
464        })
465    }
466
467    /// Rename a session.
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if the update fails or session not found.
472    pub fn rename_session(
473        &mut self,
474        id: &str,
475        new_name: &str,
476        actor: &str,
477    ) -> Result<()> {
478        let now = chrono::Utc::now().timestamp_millis();
479
480        self.mutate("rename_session", actor, |tx, ctx| {
481            let rows = tx.execute(
482                "UPDATE sessions SET name = ?1, updated_at = ?2 WHERE id = ?3",
483                rusqlite::params![new_name, now, id],
484            )?;
485
486            if rows == 0 {
487                return Err(Error::SessionNotFound { id: id.to_string() });
488            }
489
490            ctx.record_event("session", id, EventType::SessionUpdated);
491            ctx.mark_session_dirty(id);
492
493            Ok(())
494        })
495    }
496
497    /// Delete a session and all related data.
498    ///
499    /// This cascades to delete:
500    /// - Context items in the session
501    /// - Checkpoints for the session
502    /// - Session project paths
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the session doesn't exist or can't be deleted.
507    pub fn delete_session(&mut self, id: &str, actor: &str) -> Result<()> {
508        self.mutate("delete_session", actor, |tx, ctx| {
509            // Verify session exists
510            let exists: bool = tx
511                .query_row(
512                    "SELECT 1 FROM sessions WHERE id = ?1",
513                    [id],
514                    |_| Ok(true),
515                )
516                .unwrap_or(false);
517
518            if !exists {
519                return Err(Error::SessionNotFound { id: id.to_string() });
520            }
521
522            // Delete context items for this session
523            tx.execute(
524                "DELETE FROM context_items WHERE session_id = ?1",
525                [id],
526            )?;
527
528            // Delete checkpoints for this session
529            tx.execute(
530                "DELETE FROM checkpoints WHERE session_id = ?1",
531                [id],
532            )?;
533
534            // Delete session paths
535            tx.execute(
536                "DELETE FROM session_projects WHERE session_id = ?1",
537                [id],
538            )?;
539
540            // Delete the session itself
541            tx.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
542
543            ctx.record_event("session", id, EventType::SessionDeleted);
544
545            Ok(())
546        })
547    }
548
549    /// Add a project path to a session (for multi-project sessions).
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if the session doesn't exist or the path is already added.
554    pub fn add_session_path(
555        &mut self,
556        session_id: &str,
557        project_path: &str,
558        actor: &str,
559    ) -> Result<()> {
560        let now = chrono::Utc::now().timestamp_millis();
561
562        self.mutate("add_session_path", actor, |tx, ctx| {
563            // Verify session exists
564            let exists: bool = tx
565                .query_row(
566                    "SELECT 1 FROM sessions WHERE id = ?1",
567                    [session_id],
568                    |_| Ok(true),
569                )
570                .unwrap_or(false);
571
572            if !exists {
573                return Err(Error::SessionNotFound { id: session_id.to_string() });
574            }
575
576            // Insert the path (will fail if already exists due to PRIMARY KEY constraint)
577            let result = tx.execute(
578                "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
579                rusqlite::params![session_id, project_path, now],
580            );
581
582            match result {
583                Ok(_) => {
584                    ctx.record_event("session", session_id, EventType::SessionPathAdded);
585                    ctx.mark_session_dirty(session_id);
586                    Ok(())
587                }
588                Err(rusqlite::Error::SqliteFailure(err, _))
589                    if err.code == rusqlite::ErrorCode::ConstraintViolation =>
590                {
591                    Err(Error::Other(format!(
592                        "Path already added to session: {project_path}"
593                    )))
594                }
595                Err(e) => Err(e.into()),
596            }
597        })
598    }
599
600    /// Remove a project path from a session.
601    ///
602    /// Cannot remove the last path (sessions must have at least the primary path).
603    ///
604    /// # Errors
605    ///
606    /// Returns an error if the session doesn't exist or this is the last path.
607    pub fn remove_session_path(
608        &mut self,
609        session_id: &str,
610        project_path: &str,
611        actor: &str,
612    ) -> Result<()> {
613        self.mutate("remove_session_path", actor, |tx, ctx| {
614            // Verify session exists
615            let session_path: Option<String> = tx
616                .query_row(
617                    "SELECT project_path FROM sessions WHERE id = ?1",
618                    [session_id],
619                    |row| row.get(0),
620                )
621                .optional()?;
622
623            let primary_path = session_path.ok_or_else(|| Error::SessionNotFound {
624                id: session_id.to_string(),
625            })?;
626
627            // Cannot remove the primary project path from sessions table
628            if primary_path == project_path {
629                return Err(Error::Other(
630                    "Cannot remove primary project path. Use delete_session instead.".to_string(),
631                ));
632            }
633
634            // Delete from session_projects
635            let rows = tx.execute(
636                "DELETE FROM session_projects WHERE session_id = ?1 AND project_path = ?2",
637                rusqlite::params![session_id, project_path],
638            )?;
639
640            if rows == 0 {
641                return Err(Error::Other(format!(
642                    "Path not found in session: {project_path}"
643                )));
644            }
645
646            ctx.record_event("session", session_id, EventType::SessionPathRemoved);
647            ctx.mark_session_dirty(session_id);
648
649            Ok(())
650        })
651    }
652
653    /// Get all project paths for a session.
654    ///
655    /// Returns the primary path from the session plus any additional paths from session_projects.
656    pub fn get_session_paths(&self, session_id: &str) -> Result<Vec<String>> {
657        let conn = self.conn();
658
659        // Get primary path from session
660        let primary_path: Option<String> = conn
661            .query_row(
662                "SELECT project_path FROM sessions WHERE id = ?1",
663                [session_id],
664                |row| row.get(0),
665            )
666            .optional()?;
667
668        let Some(primary) = primary_path else {
669            return Err(Error::SessionNotFound { id: session_id.to_string() });
670        };
671
672        // Get additional paths
673        let mut stmt = conn.prepare(
674            "SELECT project_path FROM session_projects WHERE session_id = ?1 ORDER BY added_at",
675        )?;
676
677        let additional_paths: Vec<String> = stmt
678            .query_map([session_id], |row| row.get(0))?
679            .filter_map(|r| r.ok())
680            .collect();
681
682        // Combine: primary path first, then additional
683        let mut paths = vec![primary];
684        paths.extend(additional_paths);
685
686        Ok(paths)
687    }
688
689    // =======================
690    // Context Item Operations
691    // =======================
692
693    /// Save a context item (upsert).
694    ///
695    /// # Errors
696    ///
697    /// Returns an error if the operation fails.
698    pub fn save_context_item(
699        &mut self,
700        id: &str,
701        session_id: &str,
702        key: &str,
703        value: &str,
704        category: Option<&str>,
705        priority: Option<&str>,
706        actor: &str,
707    ) -> Result<()> {
708        let now = chrono::Utc::now().timestamp_millis();
709        let category = category.unwrap_or("note");
710        let priority = priority.unwrap_or("normal");
711        let size = value.len() as i64;
712
713        self.mutate("save_context_item", actor, |tx, ctx| {
714            // Check if exists for event type
715            let exists: bool = tx
716                .prepare("SELECT 1 FROM context_items WHERE session_id = ?1 AND key = ?2")?
717                .exists(rusqlite::params![session_id, key])?;
718
719            tx.execute(
720                "INSERT INTO context_items (id, session_id, key, value, category, priority, size, created_at, updated_at)
721                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
722                 ON CONFLICT(session_id, key) DO UPDATE SET
723                   value = excluded.value,
724                   category = excluded.category,
725                   priority = excluded.priority,
726                   size = excluded.size,
727                   updated_at = excluded.updated_at",
728                rusqlite::params![id, session_id, key, value, category, priority, size, now],
729            )?;
730
731            let event_type = if exists {
732                EventType::ItemUpdated
733            } else {
734                EventType::ItemCreated
735            };
736            ctx.record_event("context_item", id, event_type);
737            ctx.mark_item_dirty(id);
738
739            Ok(())
740        })
741    }
742
743    /// Look up the actual item ID by session + key.
744    ///
745    /// Needed after upserts where ON CONFLICT keeps the original ID.
746    pub fn get_item_id_by_key(&self, session_id: &str, key: &str) -> Result<Option<String>> {
747        let id = self.conn.query_row(
748            "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
749            rusqlite::params![session_id, key],
750            |row| row.get(0),
751        ).optional()?;
752        Ok(id)
753    }
754
755    /// Get all context items for a session with their fast-tier embeddings (if any).
756    ///
757    /// Single LEFT JOIN query — items without embeddings get `None`.
758    /// Only fetches chunk_index=0 (the primary embedding per item).
759    pub fn get_items_with_fast_embeddings(
760        &self,
761        session_id: &str,
762    ) -> Result<Vec<(ContextItem, Option<Vec<f32>>)>> {
763        let sql = "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority,
764                          ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at,
765                          ec.embedding
766                   FROM context_items ci
767                   LEFT JOIN embedding_chunks_fast ec ON ec.item_id = ci.id AND ec.chunk_index = 0
768                   WHERE ci.session_id = ?1
769                   ORDER BY ci.updated_at DESC";
770
771        let mut stmt = self.conn.prepare(sql)?;
772        let rows = stmt.query_map(rusqlite::params![session_id], |row| {
773            let item = ContextItem {
774                id: row.get(0)?,
775                session_id: row.get(1)?,
776                key: row.get(2)?,
777                value: row.get(3)?,
778                category: row.get(4)?,
779                priority: row.get(5)?,
780                channel: row.get(6)?,
781                tags: row.get(7)?,
782                size: row.get(8)?,
783                created_at: row.get(9)?,
784                updated_at: row.get(10)?,
785            };
786
787            let embedding: Option<Vec<f32>> = row.get::<_, Option<Vec<u8>>>(11)?
788                .map(|blob| {
789                    blob.chunks_exact(4)
790                        .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
791                        .collect()
792                });
793
794            Ok((item, embedding))
795        })?;
796
797        let mut results = Vec::new();
798        for row in rows {
799            results.push(row?);
800        }
801        Ok(results)
802    }
803
804    /// Get context items for a session.
805    ///
806    /// # Errors
807    ///
808    /// Returns an error if the query fails.
809    pub fn get_context_items(
810        &self,
811        session_id: &str,
812        category: Option<&str>,
813        priority: Option<&str>,
814        limit: Option<u32>,
815    ) -> Result<Vec<ContextItem>> {
816        let limit = limit.unwrap_or(100);
817
818        let mut sql = String::from(
819            "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
820             FROM context_items WHERE session_id = ?1",
821        );
822
823        let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(session_id.to_string())];
824
825        if let Some(cat) = category {
826            sql.push_str(" AND category = ?");
827            params.push(Box::new(cat.to_string()));
828        }
829
830        if let Some(pri) = priority {
831            sql.push_str(" AND priority = ?");
832            params.push(Box::new(pri.to_string()));
833        }
834
835        sql.push_str(" ORDER BY created_at DESC LIMIT ?");
836        params.push(Box::new(limit));
837
838        let mut stmt = self.conn.prepare(&sql)?;
839        let params_refs: Vec<&dyn rusqlite::ToSql> = params
840            .iter()
841            .map(|b| b.as_ref())
842            .collect();
843
844        let rows = stmt.query_map(params_refs.as_slice(), |row| {
845            Ok(ContextItem {
846                id: row.get(0)?,
847                session_id: row.get(1)?,
848                key: row.get(2)?,
849                value: row.get(3)?,
850                category: row.get(4)?,
851                priority: row.get(5)?,
852                channel: row.get(6)?,
853                tags: row.get(7)?,
854                size: row.get(8)?,
855                created_at: row.get(9)?,
856                updated_at: row.get(10)?,
857            })
858        })?;
859
860        rows.collect::<std::result::Result<Vec<_>, _>>()
861            .map_err(Error::from)
862    }
863
864    /// Delete a context item.
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if the delete fails.
869    pub fn delete_context_item(
870        &mut self,
871        session_id: &str,
872        key: &str,
873        actor: &str,
874    ) -> Result<()> {
875        self.mutate("delete_context_item", actor, |tx, ctx| {
876            // Get ID and project_path for tracking
877            let info: Option<(String, Option<String>)> = tx
878                .query_row(
879                    "SELECT ci.id, s.project_path
880                     FROM context_items ci
881                     JOIN sessions s ON ci.session_id = s.id
882                     WHERE ci.session_id = ?1 AND ci.key = ?2",
883                    rusqlite::params![session_id, key],
884                    |row| Ok((row.get(0)?, row.get(1)?)),
885                )
886                .optional()?;
887
888            let rows = tx.execute(
889                "DELETE FROM context_items WHERE session_id = ?1 AND key = ?2",
890                rusqlite::params![session_id, key],
891            )?;
892
893            if rows > 0 {
894                if let Some((item_id, project_path)) = info {
895                    ctx.record_event("context_item", &item_id, EventType::ItemDeleted);
896
897                    // Record for sync export
898                    if let Some(ref path) = project_path {
899                        let now = chrono::Utc::now().timestamp_millis();
900                        tx.execute(
901                            "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
902                             VALUES ('context_item', ?1, ?2, ?3, ?4, 0)
903                             ON CONFLICT(entity_type, entity_id) DO UPDATE SET
904                               deleted_at = excluded.deleted_at,
905                               deleted_by = excluded.deleted_by,
906                               exported = 0",
907                            rusqlite::params![item_id, path, now, ctx.actor],
908                        )?;
909                    }
910                }
911            }
912
913            Ok(())
914        })
915    }
916
917    /// Update a context item's value, category, priority, or channel.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the update fails.
922    pub fn update_context_item(
923        &mut self,
924        session_id: &str,
925        key: &str,
926        value: Option<&str>,
927        category: Option<&str>,
928        priority: Option<&str>,
929        channel: Option<&str>,
930        actor: &str,
931    ) -> Result<()> {
932        self.mutate("update_context_item", actor, |tx, ctx| {
933            let now = chrono::Utc::now().timestamp_millis();
934
935            // Build dynamic UPDATE query - collect field names and params separately
936            let mut set_parts: Vec<&str> = vec!["updated_at"];
937            let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
938
939            if let Some(v) = value {
940                set_parts.push("value");
941                set_parts.push("size");
942                params.push(Box::new(v.to_string()));
943                params.push(Box::new(v.len() as i64));
944            }
945            if let Some(c) = category {
946                set_parts.push("category");
947                params.push(Box::new(c.to_string()));
948            }
949            if let Some(p) = priority {
950                set_parts.push("priority");
951                params.push(Box::new(p.to_string()));
952            }
953            if let Some(ch) = channel {
954                set_parts.push("channel");
955                params.push(Box::new(ch.to_string()));
956            }
957
958            // Get item ID for event tracking
959            let item_id: Option<String> = tx
960                .query_row(
961                    "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
962                    rusqlite::params![session_id, key],
963                    |row| row.get(0),
964                )
965                .optional()?;
966
967            if item_id.is_none() {
968                return Err(Error::Database(rusqlite::Error::QueryReturnedNoRows));
969            }
970
971            // Build SET clause with numbered placeholders
972            let set_clause: String = set_parts
973                .iter()
974                .enumerate()
975                .map(|(i, field)| format!("{} = ?{}", field, i + 1))
976                .collect::<Vec<_>>()
977                .join(", ");
978
979            let param_count = params.len();
980            let query = format!(
981                "UPDATE context_items SET {} WHERE session_id = ?{} AND key = ?{}",
982                set_clause,
983                param_count + 1,
984                param_count + 2
985            );
986
987            params.push(Box::new(session_id.to_string()));
988            params.push(Box::new(key.to_string()));
989
990            let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
991            tx.execute(&query, param_refs.as_slice())?;
992
993            if let Some(id) = item_id {
994                ctx.record_event("context_item", &id, EventType::ItemUpdated);
995            }
996
997            Ok(())
998        })
999    }
1000
1001    /// Add tags to a context item.
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns an error if the update fails.
1006    pub fn add_tags_to_item(
1007        &mut self,
1008        session_id: &str,
1009        key: &str,
1010        tags_to_add: &[String],
1011        actor: &str,
1012    ) -> Result<()> {
1013        self.mutate("add_tags_to_item", actor, |tx, ctx| {
1014            let now = chrono::Utc::now().timestamp_millis();
1015
1016            // Get current tags
1017            let (item_id, current_tags): (String, String) = tx.query_row(
1018                "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
1019                rusqlite::params![session_id, key],
1020                |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
1021            )?;
1022
1023            // Parse current tags
1024            let mut tags: Vec<String> = serde_json::from_str(&current_tags).unwrap_or_default();
1025
1026            // Add new tags (avoiding duplicates)
1027            for tag in tags_to_add {
1028                if !tags.contains(tag) {
1029                    tags.push(tag.clone());
1030                }
1031            }
1032
1033            // Update
1034            let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
1035            tx.execute(
1036                "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
1037                rusqlite::params![new_tags, now, item_id],
1038            )?;
1039
1040            ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
1041
1042            Ok(())
1043        })
1044    }
1045
1046    /// Remove tags from a context item.
1047    ///
1048    /// # Errors
1049    ///
1050    /// Returns an error if the update fails.
1051    pub fn remove_tags_from_item(
1052        &mut self,
1053        session_id: &str,
1054        key: &str,
1055        tags_to_remove: &[String],
1056        actor: &str,
1057    ) -> Result<()> {
1058        self.mutate("remove_tags_from_item", actor, |tx, ctx| {
1059            let now = chrono::Utc::now().timestamp_millis();
1060
1061            // Get current tags
1062            let (item_id, current_tags): (String, String) = tx.query_row(
1063                "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
1064                rusqlite::params![session_id, key],
1065                |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
1066            )?;
1067
1068            // Parse current tags
1069            let mut tags: Vec<String> = serde_json::from_str(&current_tags).unwrap_or_default();
1070
1071            // Remove specified tags
1072            tags.retain(|t| !tags_to_remove.contains(t));
1073
1074            // Update
1075            let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
1076            tx.execute(
1077                "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
1078                rusqlite::params![new_tags, now, item_id],
1079            )?;
1080
1081            ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
1082
1083            Ok(())
1084        })
1085    }
1086
1087    // ================
1088    // Issue Operations
1089    // ================
1090
1091    /// Create a new issue.
1092    ///
1093    /// # Errors
1094    ///
1095    /// Returns an error if the insert fails.
1096    #[allow(clippy::too_many_arguments)]
1097    pub fn create_issue(
1098        &mut self,
1099        id: &str,
1100        short_id: Option<&str>,
1101        project_path: &str,
1102        title: &str,
1103        description: Option<&str>,
1104        details: Option<&str>,
1105        issue_type: Option<&str>,
1106        priority: Option<i32>,
1107        plan_id: Option<&str>,
1108        actor: &str,
1109    ) -> Result<()> {
1110        let now = chrono::Utc::now().timestamp_millis();
1111        let issue_type = issue_type.unwrap_or("task");
1112        let priority = priority.unwrap_or(2);
1113
1114        self.mutate("create_issue", actor, |tx, ctx| {
1115            tx.execute(
1116                "INSERT INTO issues (id, short_id, project_path, title, description, details, issue_type, priority, plan_id, status, created_by_agent, created_at, updated_at)
1117                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 'open', ?10, ?11, ?11)",
1118                rusqlite::params![id, short_id, project_path, title, description, details, issue_type, priority, plan_id, actor, now],
1119            )?;
1120
1121            ctx.record_event("issue", id, EventType::IssueCreated);
1122            ctx.mark_issue_dirty(id);
1123
1124            Ok(())
1125        })
1126    }
1127
1128    /// Get an issue by ID (full ID or short ID).
1129    ///
1130    /// # Errors
1131    ///
1132    /// Returns an error if the query fails.
1133    pub fn get_issue(&self, id: &str, project_path: Option<&str>) -> Result<Option<Issue>> {
1134        // Try full ID first, then short ID
1135        let sql = if project_path.is_some() {
1136            "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1137             FROM issues WHERE (id = ?1 OR short_id = ?1) AND project_path = ?2"
1138        } else {
1139            "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1140             FROM issues WHERE id = ?1 OR short_id = ?1"
1141        };
1142
1143        let mut stmt = self.conn.prepare(sql)?;
1144
1145        let issue = if let Some(path) = project_path {
1146            stmt.query_row(rusqlite::params![id, path], map_issue_row)
1147        } else {
1148            stmt.query_row([id], map_issue_row)
1149        }
1150        .optional()?;
1151
1152        Ok(issue)
1153    }
1154
1155    /// List issues with filters.
1156    ///
1157    /// # Errors
1158    ///
1159    /// Returns an error if the query fails.
1160    pub fn list_issues(
1161        &self,
1162        project_path: &str,
1163        status: Option<&str>,
1164        issue_type: Option<&str>,
1165        limit: Option<u32>,
1166    ) -> Result<Vec<Issue>> {
1167        let limit = limit.unwrap_or(50);
1168
1169        let mut sql = String::from(
1170            "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1171             FROM issues WHERE project_path = ?1",
1172        );
1173
1174        let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project_path.to_string())];
1175
1176        if let Some(st) = status {
1177            if st != "all" {
1178                sql.push_str(" AND status = ?");
1179                params.push(Box::new(st.to_string()));
1180            }
1181        } else {
1182            // Default: exclude closed
1183            sql.push_str(" AND status != 'closed'");
1184        }
1185
1186        if let Some(t) = issue_type {
1187            sql.push_str(" AND issue_type = ?");
1188            params.push(Box::new(t.to_string()));
1189        }
1190
1191        sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1192        params.push(Box::new(limit));
1193
1194        let mut stmt = self.conn.prepare(&sql)?;
1195        let params_refs: Vec<&dyn rusqlite::ToSql> = params
1196            .iter()
1197            .map(|b| b.as_ref())
1198            .collect();
1199
1200        let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1201
1202        rows.collect::<std::result::Result<Vec<_>, _>>()
1203            .map_err(Error::from)
1204    }
1205
1206    /// List issues across all projects.
1207    ///
1208    /// # Errors
1209    ///
1210    /// Returns an error if the query fails.
1211    pub fn list_all_issues(
1212        &self,
1213        status: Option<&str>,
1214        issue_type: Option<&str>,
1215        limit: Option<u32>,
1216    ) -> Result<Vec<Issue>> {
1217        let limit = limit.unwrap_or(50);
1218
1219        let mut sql = String::from(
1220            "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1221             FROM issues WHERE 1=1",
1222        );
1223
1224        let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1225
1226        if let Some(st) = status {
1227            if st != "all" {
1228                sql.push_str(" AND status = ?");
1229                params.push(Box::new(st.to_string()));
1230            }
1231        } else {
1232            // Default: exclude closed
1233            sql.push_str(" AND status != 'closed'");
1234        }
1235
1236        if let Some(t) = issue_type {
1237            sql.push_str(" AND issue_type = ?");
1238            params.push(Box::new(t.to_string()));
1239        }
1240
1241        sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1242        params.push(Box::new(limit));
1243
1244        let mut stmt = self.conn.prepare(&sql)?;
1245        let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1246
1247        let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1248
1249        rows.collect::<std::result::Result<Vec<_>, _>>()
1250            .map_err(Error::from)
1251    }
1252
1253    /// Update issue status.
1254    ///
1255    /// Accepts either full ID or short_id.
1256    ///
1257    /// # Errors
1258    ///
1259    /// Returns an error if the update fails.
1260    pub fn update_issue_status(
1261        &mut self,
1262        id: &str,
1263        status: &str,
1264        actor: &str,
1265    ) -> Result<()> {
1266        let now = chrono::Utc::now().timestamp_millis();
1267        let closed_at = if status == "closed" { Some(now) } else { None };
1268
1269        self.mutate("update_issue_status", actor, |tx, ctx| {
1270            let rows = tx.execute(
1271                "UPDATE issues SET status = ?1, closed_at = ?2, closed_by_agent = ?3, updated_at = ?4 WHERE id = ?5 OR short_id = ?5",
1272                rusqlite::params![status, closed_at, if status == "closed" { Some(actor) } else { None }, now, id],
1273            )?;
1274
1275            if rows == 0 {
1276                return Err(Error::IssueNotFound { id: id.to_string() });
1277            }
1278
1279            let event_type = if status == "closed" {
1280                EventType::IssueClosed
1281            } else {
1282                EventType::IssueUpdated
1283            };
1284            ctx.record_event("issue", id, event_type);
1285            ctx.mark_issue_dirty(id);
1286
1287            Ok(())
1288        })
1289    }
1290
1291    /// Update issue fields (title, description, details, priority, issue_type).
1292    ///
1293    /// Only updates fields that are Some. Status is handled separately.
1294    ///
1295    /// # Errors
1296    ///
1297    /// Returns an error if the update fails.
1298    #[allow(clippy::too_many_arguments)]
1299    pub fn update_issue(
1300        &mut self,
1301        id: &str,
1302        title: Option<&str>,
1303        description: Option<&str>,
1304        details: Option<&str>,
1305        priority: Option<i32>,
1306        issue_type: Option<&str>,
1307        plan_id: Option<&str>,
1308        parent_id: Option<&str>,
1309        actor: &str,
1310    ) -> Result<()> {
1311        let now = chrono::Utc::now().timestamp_millis();
1312
1313        // Build dynamic UPDATE query based on provided fields
1314        let mut set_clauses = vec!["updated_at = ?"];
1315        let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1316
1317        if let Some(t) = title {
1318            set_clauses.push("title = ?");
1319            params.push(Box::new(t.to_string()));
1320        }
1321        if let Some(d) = description {
1322            set_clauses.push("description = ?");
1323            params.push(Box::new(d.to_string()));
1324        }
1325        if let Some(dt) = details {
1326            set_clauses.push("details = ?");
1327            params.push(Box::new(dt.to_string()));
1328        }
1329        if let Some(p) = priority {
1330            set_clauses.push("priority = ?");
1331            params.push(Box::new(p));
1332        }
1333        if let Some(it) = issue_type {
1334            set_clauses.push("issue_type = ?");
1335            params.push(Box::new(it.to_string()));
1336        }
1337        if let Some(pid) = plan_id {
1338            set_clauses.push("plan_id = ?");
1339            params.push(Box::new(pid.to_string()));
1340        }
1341
1342        // Only updated_at - no actual changes
1343        if set_clauses.len() == 1 && parent_id.is_none() {
1344            return Ok(());
1345        }
1346
1347        self.mutate("update_issue", actor, |tx, ctx| {
1348            // Update the issue fields
1349            if set_clauses.len() > 1 {
1350                let sql = format!(
1351                    "UPDATE issues SET {} WHERE id = ? OR short_id = ?",
1352                    set_clauses.join(", ")
1353                );
1354                params.push(Box::new(id.to_string()));
1355                params.push(Box::new(id.to_string()));
1356
1357                let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
1358                let rows = tx.execute(&sql, param_refs.as_slice())?;
1359
1360                if rows == 0 {
1361                    return Err(Error::IssueNotFound { id: id.to_string() });
1362                }
1363            }
1364
1365            // Handle parent_id change via dependency
1366            if let Some(new_parent) = parent_id {
1367                // First, get the full ID
1368                let full_id: String = tx.query_row(
1369                    "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1370                    [id],
1371                    |row| row.get(0),
1372                )?;
1373
1374                // Remove existing parent-child dependency
1375                tx.execute(
1376                    "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND dependency_type = 'parent-child'",
1377                    [&full_id],
1378                )?;
1379
1380                // Add new parent-child dependency if not empty
1381                if !new_parent.is_empty() {
1382                    let parent_full_id: String = tx.query_row(
1383                        "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1384                        [new_parent],
1385                        |row| row.get(0),
1386                    )?;
1387
1388                    tx.execute(
1389                        "INSERT INTO issue_dependencies (issue_id, depends_on_id, dependency_type, created_at)
1390                         VALUES (?1, ?2, 'parent-child', ?3)",
1391                        rusqlite::params![full_id, parent_full_id, now],
1392                    )?;
1393                }
1394            }
1395
1396            ctx.record_event("issue", id, EventType::IssueUpdated);
1397            ctx.mark_issue_dirty(id);
1398
1399            Ok(())
1400        })
1401    }
1402
1403    /// Claim an issue (assign to agent).
1404    ///
1405    /// Accepts either full ID or short_id.
1406    ///
1407    /// # Errors
1408    ///
1409    /// Returns an error if the claim fails.
1410    pub fn claim_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1411        let now = chrono::Utc::now().timestamp_millis();
1412
1413        self.mutate("claim_issue", actor, |tx, ctx| {
1414            let rows = tx.execute(
1415                "UPDATE issues SET assigned_to_agent = ?1, assigned_at = ?2, status = 'in_progress', updated_at = ?2 WHERE id = ?3 OR short_id = ?3",
1416                rusqlite::params![actor, now, id],
1417            )?;
1418
1419            if rows == 0 {
1420                return Err(Error::IssueNotFound { id: id.to_string() });
1421            }
1422
1423            ctx.record_event("issue", id, EventType::IssueClaimed);
1424            ctx.mark_issue_dirty(id);
1425
1426            Ok(())
1427        })
1428    }
1429
1430    /// Release an issue (unassign).
1431    ///
1432    /// Accepts either full ID or short_id.
1433    ///
1434    /// # Errors
1435    ///
1436    /// Returns an error if the release fails.
1437    pub fn release_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1438        let now = chrono::Utc::now().timestamp_millis();
1439
1440        self.mutate("release_issue", actor, |tx, ctx| {
1441            let rows = tx.execute(
1442                "UPDATE issues SET assigned_to_agent = NULL, assigned_at = NULL, status = 'open', updated_at = ?1 WHERE id = ?2 OR short_id = ?2",
1443                rusqlite::params![now, id],
1444            )?;
1445
1446            if rows == 0 {
1447                return Err(Error::IssueNotFound { id: id.to_string() });
1448            }
1449
1450            ctx.record_event("issue", id, EventType::IssueReleased);
1451            ctx.mark_issue_dirty(id);
1452
1453            Ok(())
1454        })
1455    }
1456
1457    /// Delete an issue.
1458    ///
1459    /// Accepts either full ID or short_id.
1460    ///
1461    /// # Errors
1462    ///
1463    /// Returns an error if the delete fails.
1464    pub fn delete_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1465        self.mutate("delete_issue", actor, |tx, ctx| {
1466            // First get the full issue ID and project_path
1467            let info: Option<(String, String)> = tx
1468                .query_row(
1469                    "SELECT id, project_path FROM issues WHERE id = ?1 OR short_id = ?1",
1470                    [id],
1471                    |row| Ok((row.get(0)?, row.get(1)?)),
1472                )
1473                .optional()?;
1474
1475            let (full_id, project_path) =
1476                info.ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1477
1478            // Delete dependencies using full ID
1479            tx.execute(
1480                "DELETE FROM issue_dependencies WHERE issue_id = ?1 OR depends_on_id = ?1",
1481                [&full_id],
1482            )?;
1483
1484            // Delete the issue
1485            let rows = tx.execute("DELETE FROM issues WHERE id = ?1", [&full_id])?;
1486
1487            if rows == 0 {
1488                return Err(Error::IssueNotFound { id: id.to_string() });
1489            }
1490
1491            ctx.record_event("issue", &full_id, EventType::IssueDeleted);
1492
1493            // Record for sync export
1494            let now = chrono::Utc::now().timestamp_millis();
1495            tx.execute(
1496                "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
1497                 VALUES ('issue', ?1, ?2, ?3, ?4, 0)
1498                 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
1499                   deleted_at = excluded.deleted_at,
1500                   deleted_by = excluded.deleted_by,
1501                   exported = 0",
1502                rusqlite::params![full_id, project_path, now, ctx.actor],
1503            )?;
1504
1505            Ok(())
1506        })
1507    }
1508
1509    /// Add labels to an issue.
1510    ///
1511    /// # Errors
1512    ///
1513    /// Returns an error if the operation fails.
1514    pub fn add_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1515        self.mutate("add_issue_labels", actor, |tx, ctx| {
1516            // Get full issue ID
1517            let full_id: String = tx
1518                .query_row(
1519                    "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1520                    [id],
1521                    |row| row.get(0),
1522                )
1523                .optional()?
1524                .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1525
1526            for label in labels {
1527                let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1528                tx.execute(
1529                    "INSERT OR IGNORE INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1530                    rusqlite::params![label_id, full_id, label],
1531                )?;
1532            }
1533
1534            ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1535            Ok(())
1536        })
1537    }
1538
1539    /// Remove labels from an issue.
1540    ///
1541    /// # Errors
1542    ///
1543    /// Returns an error if the operation fails.
1544    pub fn remove_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1545        self.mutate("remove_issue_labels", actor, |tx, ctx| {
1546            // Get full issue ID
1547            let full_id: String = tx
1548                .query_row(
1549                    "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1550                    [id],
1551                    |row| row.get(0),
1552                )
1553                .optional()?
1554                .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1555
1556            for label in labels {
1557                tx.execute(
1558                    "DELETE FROM issue_labels WHERE issue_id = ?1 AND label = ?2",
1559                    rusqlite::params![full_id, label],
1560                )?;
1561            }
1562
1563            ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1564            Ok(())
1565        })
1566    }
1567
1568    /// Get labels for an issue.
1569    ///
1570    /// # Errors
1571    ///
1572    /// Returns an error if the query fails.
1573    pub fn get_issue_labels(&self, id: &str) -> Result<Vec<String>> {
1574        let full_id: String = self
1575            .conn
1576            .query_row(
1577                "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1578                [id],
1579                |row| row.get(0),
1580            )
1581            .optional()?
1582            .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1583
1584        let mut stmt = self
1585            .conn
1586            .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1 ORDER BY label")?;
1587        let labels = stmt
1588            .query_map([&full_id], |row| row.get(0))?
1589            .collect::<std::result::Result<Vec<String>, _>>()?;
1590        Ok(labels)
1591    }
1592
1593    /// Check if an issue has any dependencies (depends on other issues).
1594    pub fn issue_has_dependencies(&self, id: &str) -> Result<bool> {
1595        let full_id: String = self
1596            .conn
1597            .query_row(
1598                "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1599                [id],
1600                |row| row.get(0),
1601            )
1602            .optional()?
1603            .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1604
1605        let count: i64 = self.conn.query_row(
1606            "SELECT COUNT(*) FROM issue_dependencies WHERE issue_id = ?1",
1607            [&full_id],
1608            |row| row.get(0),
1609        )?;
1610        Ok(count > 0)
1611    }
1612
1613    /// Check if an issue has any subtasks (child issues via parent-child dependency).
1614    pub fn issue_has_subtasks(&self, id: &str) -> Result<bool> {
1615        let full_id: String = self
1616            .conn
1617            .query_row(
1618                "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1619                [id],
1620                |row| row.get(0),
1621            )
1622            .optional()?
1623            .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1624
1625        let count: i64 = self.conn.query_row(
1626            "SELECT COUNT(*) FROM issue_dependencies WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1627            [&full_id],
1628            |row| row.get(0),
1629        )?;
1630        Ok(count > 0)
1631    }
1632
1633    /// Get the set of issue IDs that are children of a specific parent.
1634    ///
1635    /// Returns IDs of issues that have a parent-child dependency on the given parent ID.
1636    pub fn get_child_issue_ids(&self, parent_id: &str) -> Result<std::collections::HashSet<String>> {
1637        // Resolve parent ID (handle short IDs)
1638        let full_parent_id: String = self
1639            .conn
1640            .query_row(
1641                "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1642                [parent_id],
1643                |row| row.get(0),
1644            )
1645            .optional()?
1646            .ok_or_else(|| Error::IssueNotFound { id: parent_id.to_string() })?;
1647
1648        let mut stmt = self.conn.prepare(
1649            "SELECT issue_id FROM issue_dependencies
1650             WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1651        )?;
1652
1653        let rows = stmt.query_map([&full_parent_id], |row| row.get::<_, String>(0))?;
1654
1655        let mut ids = std::collections::HashSet::new();
1656        for row in rows {
1657            ids.insert(row?);
1658        }
1659        Ok(ids)
1660    }
1661
1662    /// Add a dependency between issues.
1663    ///
1664    /// # Errors
1665    ///
1666    /// Returns an error if the operation fails.
1667    pub fn add_issue_dependency(
1668        &mut self,
1669        issue_id: &str,
1670        depends_on_id: &str,
1671        dependency_type: &str,
1672        actor: &str,
1673    ) -> Result<()> {
1674        self.mutate("add_issue_dependency", actor, |tx, ctx| {
1675            // Get full issue IDs
1676            let full_issue_id: String = tx
1677                .query_row(
1678                    "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1679                    [issue_id],
1680                    |row| row.get(0),
1681                )
1682                .optional()?
1683                .ok_or_else(|| Error::IssueNotFound {
1684                    id: issue_id.to_string(),
1685                })?;
1686
1687            let full_depends_on_id: String = tx
1688                .query_row(
1689                    "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1690                    [depends_on_id],
1691                    |row| row.get(0),
1692                )
1693                .optional()?
1694                .ok_or_else(|| Error::IssueNotFound {
1695                    id: depends_on_id.to_string(),
1696                })?;
1697
1698            let dep_id = format!("dep_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1699            let now = chrono::Utc::now().timestamp_millis();
1700
1701            tx.execute(
1702                "INSERT OR IGNORE INTO issue_dependencies (id, issue_id, depends_on_id, dependency_type, created_at)
1703                 VALUES (?1, ?2, ?3, ?4, ?5)",
1704                rusqlite::params![dep_id, full_issue_id, full_depends_on_id, dependency_type, now],
1705            )?;
1706
1707            ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1708            Ok(())
1709        })
1710    }
1711
1712    /// Remove a dependency between issues.
1713    ///
1714    /// # Errors
1715    ///
1716    /// Returns an error if the operation fails.
1717    pub fn remove_issue_dependency(
1718        &mut self,
1719        issue_id: &str,
1720        depends_on_id: &str,
1721        actor: &str,
1722    ) -> Result<()> {
1723        self.mutate("remove_issue_dependency", actor, |tx, ctx| {
1724            // Get full issue IDs
1725            let full_issue_id: String = tx
1726                .query_row(
1727                    "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1728                    [issue_id],
1729                    |row| row.get(0),
1730                )
1731                .optional()?
1732                .ok_or_else(|| Error::IssueNotFound {
1733                    id: issue_id.to_string(),
1734                })?;
1735
1736            let full_depends_on_id: String = tx
1737                .query_row(
1738                    "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1739                    [depends_on_id],
1740                    |row| row.get(0),
1741                )
1742                .optional()?
1743                .ok_or_else(|| Error::IssueNotFound {
1744                    id: depends_on_id.to_string(),
1745                })?;
1746
1747            tx.execute(
1748                "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND depends_on_id = ?2",
1749                rusqlite::params![full_issue_id, full_depends_on_id],
1750            )?;
1751
1752            ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1753            Ok(())
1754        })
1755    }
1756
1757    /// Clone an issue.
1758    ///
1759    /// # Errors
1760    ///
1761    /// Returns an error if the operation fails.
1762    pub fn clone_issue(
1763        &mut self,
1764        id: &str,
1765        new_title: Option<&str>,
1766        actor: &str,
1767    ) -> Result<Issue> {
1768        // First get the source issue
1769        let source = self
1770            .get_issue(id, None)?
1771            .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1772
1773        let new_id = format!("issue_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1774        let new_short_id = generate_short_id();
1775        let default_title = format!("Copy of {}", source.title);
1776        let title = new_title.unwrap_or(&default_title);
1777        let now = chrono::Utc::now().timestamp_millis();
1778
1779        self.mutate("clone_issue", actor, |tx, ctx| {
1780            tx.execute(
1781                "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, created_at, updated_at)
1782                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'open', ?7, ?8, ?9, ?10, ?11, ?11)",
1783                rusqlite::params![
1784                    new_id,
1785                    new_short_id,
1786                    source.project_path,
1787                    title,
1788                    source.description,
1789                    source.details,
1790                    source.priority,
1791                    source.issue_type,
1792                    source.plan_id,
1793                    ctx.actor,
1794                    now
1795                ],
1796            )?;
1797
1798            // Copy labels
1799            let labels: Vec<String> = tx
1800                .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1")?
1801                .query_map([&source.id], |row| row.get(0))?
1802                .collect::<std::result::Result<Vec<String>, _>>()?;
1803
1804            for label in &labels {
1805                let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1806                tx.execute(
1807                    "INSERT INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1808                    rusqlite::params![label_id, new_id, label],
1809                )?;
1810            }
1811
1812            ctx.record_event("issue", &new_id, EventType::IssueCreated);
1813            Ok(())
1814        })?;
1815
1816        // Return the new issue
1817        self.get_issue(&new_id, None)?
1818            .ok_or_else(|| Error::Other("Failed to retrieve cloned issue".to_string()))
1819    }
1820
1821    /// Mark an issue as a duplicate of another.
1822    ///
1823    /// # Errors
1824    ///
1825    /// Returns an error if the operation fails.
1826    pub fn mark_issue_duplicate(
1827        &mut self,
1828        id: &str,
1829        duplicate_of_id: &str,
1830        actor: &str,
1831    ) -> Result<()> {
1832        // Add duplicate-of dependency
1833        self.add_issue_dependency(id, duplicate_of_id, "duplicate-of", actor)?;
1834
1835        // Close the issue
1836        self.update_issue_status(id, "closed", actor)?;
1837
1838        Ok(())
1839    }
1840
1841    /// Get issues that are ready to work on (open, no blocking dependencies, not assigned).
1842    ///
1843    /// # Errors
1844    ///
1845    /// Returns an error if the query fails.
1846    pub fn get_ready_issues(&self, project_path: &str, limit: u32) -> Result<Vec<Issue>> {
1847        let mut stmt = self.conn.prepare(
1848            "SELECT i.id, i.short_id, i.project_path, i.title, i.description, i.details,
1849                    i.status, i.priority, i.issue_type, i.plan_id, i.created_by_agent,
1850                    i.assigned_to_agent, i.created_at, i.updated_at, i.closed_at
1851             FROM issues i
1852             WHERE i.project_path = ?1
1853               AND i.status = 'open'
1854               AND i.assigned_to_agent IS NULL
1855               AND NOT EXISTS (
1856                   SELECT 1 FROM issue_dependencies d
1857                   JOIN issues dep ON dep.id = d.depends_on_id
1858                   WHERE d.issue_id = i.id
1859                     AND d.dependency_type = 'blocks'
1860                     AND dep.status != 'closed'
1861               )
1862             ORDER BY i.priority DESC, i.created_at ASC
1863             LIMIT ?2",
1864        )?;
1865
1866        let issues = stmt
1867            .query_map(rusqlite::params![project_path, limit], |row| {
1868                Ok(Issue {
1869                    id: row.get(0)?,
1870                    short_id: row.get(1)?,
1871                    project_path: row.get(2)?,
1872                    title: row.get(3)?,
1873                    description: row.get(4)?,
1874                    details: row.get(5)?,
1875                    status: row.get(6)?,
1876                    priority: row.get(7)?,
1877                    issue_type: row.get(8)?,
1878                    plan_id: row.get(9)?,
1879                    created_by_agent: row.get(10)?,
1880                    assigned_to_agent: row.get(11)?,
1881                    created_at: row.get(12)?,
1882                    updated_at: row.get(13)?,
1883                    closed_at: row.get(14)?,
1884                })
1885            })?
1886            .collect::<std::result::Result<Vec<_>, _>>()?;
1887
1888        Ok(issues)
1889    }
1890
1891    /// Get and claim next block of ready issues.
1892    ///
1893    /// # Errors
1894    ///
1895    /// Returns an error if the operation fails.
1896    pub fn get_next_issue_block(
1897        &mut self,
1898        project_path: &str,
1899        count: u32,
1900        actor: &str,
1901    ) -> Result<Vec<Issue>> {
1902        let ready = self.get_ready_issues(project_path, count)?;
1903
1904        for issue in &ready {
1905            self.claim_issue(&issue.id, actor)?;
1906        }
1907
1908        // Return claimed issues with updated status
1909        let claimed: Vec<Issue> = ready
1910            .iter()
1911            .filter_map(|i| self.get_issue(&i.id, None).ok().flatten())
1912            .collect();
1913
1914        Ok(claimed)
1915    }
1916
1917    // ======================
1918    // Issue Analytics
1919    // ======================
1920
1921    /// Count issues grouped by a field (status, type, priority, assignee).
1922    pub fn count_issues_grouped(
1923        &self,
1924        project_path: &str,
1925        group_by: &str,
1926    ) -> Result<Vec<(String, i64)>> {
1927        let column = match group_by {
1928            "status" => "status",
1929            "type" => "issue_type",
1930            "priority" => "CAST(priority AS TEXT)",
1931            "assignee" => "COALESCE(assigned_to_agent, 'unassigned')",
1932            _ => return Err(Error::InvalidArgument(
1933                format!("Invalid group_by '{group_by}'. Valid: status, type, priority, assignee")
1934            )),
1935        };
1936
1937        let sql = format!(
1938            "SELECT {column}, COUNT(*) as count FROM issues \
1939             WHERE project_path = ?1 GROUP BY {column} ORDER BY count DESC"
1940        );
1941
1942        let mut stmt = self.conn.prepare(&sql)?;
1943        let rows = stmt.query_map([project_path], |row| {
1944            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1945        })?;
1946
1947        Ok(rows.collect::<std::result::Result<Vec<_>, _>>()?)
1948    }
1949
1950    /// Get stale issues (not updated in N days).
1951    pub fn get_stale_issues(
1952        &self,
1953        project_path: &str,
1954        stale_days: u64,
1955        limit: u32,
1956    ) -> Result<Vec<Issue>> {
1957        let cutoff_ms = chrono::Utc::now().timestamp_millis()
1958            - (stale_days as i64 * 24 * 60 * 60 * 1000);
1959
1960        let mut stmt = self.conn.prepare(
1961            "SELECT id, short_id, project_path, title, description, details,
1962                    status, priority, issue_type, plan_id, created_by_agent,
1963                    assigned_to_agent, created_at, updated_at, closed_at
1964             FROM issues
1965             WHERE project_path = ?1
1966               AND status IN ('open', 'in_progress', 'blocked')
1967               AND updated_at < ?2
1968             ORDER BY updated_at ASC
1969             LIMIT ?3",
1970        )?;
1971
1972        let issues = stmt
1973            .query_map(rusqlite::params![project_path, cutoff_ms, limit], map_issue_row)?
1974            .collect::<std::result::Result<Vec<_>, _>>()?;
1975
1976        Ok(issues)
1977    }
1978
1979    /// Get blocked issues with their blockers.
1980    pub fn get_blocked_issues(
1981        &self,
1982        project_path: &str,
1983        limit: u32,
1984    ) -> Result<Vec<(Issue, Vec<Issue>)>> {
1985        let mut stmt = self.conn.prepare(
1986            "SELECT i.id, i.short_id, i.project_path, i.title, i.description, i.details,
1987                    i.status, i.priority, i.issue_type, i.plan_id, i.created_by_agent,
1988                    i.assigned_to_agent, i.created_at, i.updated_at, i.closed_at
1989             FROM issues i
1990             WHERE i.project_path = ?1
1991               AND i.status NOT IN ('closed', 'deferred')
1992               AND EXISTS (
1993                   SELECT 1 FROM issue_dependencies d
1994                   JOIN issues dep ON dep.id = d.depends_on_id
1995                   WHERE d.issue_id = i.id
1996                     AND d.dependency_type = 'blocks'
1997                     AND dep.status != 'closed'
1998               )
1999             ORDER BY i.priority DESC, i.created_at ASC
2000             LIMIT ?2",
2001        )?;
2002
2003        let blocked_issues = stmt
2004            .query_map(rusqlite::params![project_path, limit], map_issue_row)?
2005            .collect::<std::result::Result<Vec<_>, _>>()?;
2006
2007        let mut blocker_stmt = self.conn.prepare(
2008            "SELECT dep.id, dep.short_id, dep.project_path, dep.title, dep.description, dep.details,
2009                    dep.status, dep.priority, dep.issue_type, dep.plan_id, dep.created_by_agent,
2010                    dep.assigned_to_agent, dep.created_at, dep.updated_at, dep.closed_at
2011             FROM issue_dependencies d
2012             JOIN issues dep ON dep.id = d.depends_on_id
2013             WHERE d.issue_id = ?1
2014               AND d.dependency_type = 'blocks'
2015               AND dep.status != 'closed'",
2016        )?;
2017
2018        let mut results = Vec::with_capacity(blocked_issues.len());
2019        for issue in blocked_issues {
2020            let blockers = blocker_stmt
2021                .query_map([&issue.id], map_issue_row)?
2022                .collect::<std::result::Result<Vec<_>, _>>()?;
2023            results.push((issue, blockers));
2024        }
2025
2026        Ok(results)
2027    }
2028
2029    /// Get epic progress (child issue counts by status).
2030    pub fn get_epic_progress(&self, epic_id: &str) -> Result<EpicProgress> {
2031        let mut stmt = self.conn.prepare(
2032            "SELECT child.status, COUNT(*) as count
2033             FROM issue_dependencies d
2034             JOIN issues child ON child.id = d.issue_id
2035             WHERE d.depends_on_id = ?1
2036               AND d.dependency_type = 'parent-child'
2037             GROUP BY child.status",
2038        )?;
2039
2040        let rows = stmt
2041            .query_map([epic_id], |row| {
2042                Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?))
2043            })?
2044            .collect::<std::result::Result<Vec<_>, _>>()?;
2045
2046        let mut progress = EpicProgress::default();
2047        for (status, count) in rows {
2048            match status.as_str() {
2049                "closed" => progress.closed += count,
2050                "in_progress" => progress.in_progress += count,
2051                "open" => progress.open += count,
2052                "blocked" => progress.blocked += count,
2053                "deferred" => progress.deferred += count,
2054                _ => progress.open += count,
2055            }
2056            progress.total += count;
2057        }
2058
2059        Ok(progress)
2060    }
2061
2062    /// Get dependency tree starting from a root issue.
2063    /// Returns (issue, depth) pairs in tree order.
2064    pub fn get_dependency_tree(&self, root_id: &str) -> Result<Vec<(Issue, i32)>> {
2065        // First get the root issue
2066        let root = self.get_issue(root_id, None)?
2067            .ok_or_else(|| Error::IssueNotFound { id: root_id.to_string() })?;
2068
2069        let root_full_id = root.id.clone();
2070        let mut result = vec![(root, 0)];
2071        let mut queue = vec![(root_full_id.clone(), 0i32)];
2072        let mut visited = std::collections::HashSet::new();
2073        visited.insert(root_full_id);
2074
2075        let mut child_stmt = self.conn.prepare(
2076            "SELECT child.id, child.short_id, child.project_path, child.title,
2077                    child.description, child.details, child.status, child.priority,
2078                    child.issue_type, child.plan_id, child.created_by_agent,
2079                    child.assigned_to_agent, child.created_at, child.updated_at,
2080                    child.closed_at
2081             FROM issue_dependencies d
2082             JOIN issues child ON child.id = d.issue_id
2083             WHERE d.depends_on_id = ?1
2084               AND d.dependency_type IN ('parent-child', 'blocks')
2085             ORDER BY child.priority DESC, child.created_at ASC",
2086        )?;
2087
2088        while let Some((parent_id, depth)) = queue.pop() {
2089            let children = child_stmt
2090                .query_map([&parent_id], map_issue_row)?
2091                .collect::<std::result::Result<Vec<_>, _>>()?;
2092
2093            for child in children {
2094                if visited.insert(child.id.clone()) {
2095                    let child_id = child.id.clone();
2096                    result.push((child, depth + 1));
2097                    queue.push((child_id, depth + 1));
2098                }
2099            }
2100        }
2101
2102        Ok(result)
2103    }
2104
2105    /// Get all epics for a project.
2106    pub fn get_epics(&self, project_path: &str) -> Result<Vec<Issue>> {
2107        let mut stmt = self.conn.prepare(
2108            "SELECT id, short_id, project_path, title, description, details,
2109                    status, priority, issue_type, plan_id, created_by_agent,
2110                    assigned_to_agent, created_at, updated_at, closed_at
2111             FROM issues
2112             WHERE project_path = ?1
2113               AND issue_type = 'epic'
2114               AND status != 'closed'
2115             ORDER BY priority DESC, created_at ASC",
2116        )?;
2117
2118        let issues = stmt
2119            .query_map([project_path], map_issue_row)?
2120            .collect::<std::result::Result<Vec<_>, _>>()?;
2121
2122        Ok(issues)
2123    }
2124
2125    /// Update close_reason on an issue.
2126    pub fn set_close_reason(
2127        &mut self,
2128        id: &str,
2129        reason: &str,
2130        actor: &str,
2131    ) -> Result<()> {
2132        let now = chrono::Utc::now().timestamp_millis();
2133        self.mutate("set_close_reason", actor, |tx, _ctx| {
2134            let rows = tx.execute(
2135                "UPDATE issues SET close_reason = ?1, updated_at = ?2 WHERE id = ?3 OR short_id = ?3",
2136                rusqlite::params![reason, now, id],
2137            )?;
2138            if rows == 0 {
2139                return Err(Error::IssueNotFound { id: id.to_string() });
2140            }
2141            Ok(())
2142        })
2143    }
2144
2145    /// Get close_reason for an issue.
2146    pub fn get_close_reason(&self, id: &str) -> Result<Option<String>> {
2147        let result = self.conn.query_row(
2148            "SELECT close_reason FROM issues WHERE id = ?1 OR short_id = ?1",
2149            [id],
2150            |row| row.get(0),
2151        );
2152        match result {
2153            Ok(reason) => Ok(reason),
2154            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2155            Err(e) => Err(e.into()),
2156        }
2157    }
2158
2159    // ======================
2160    // Checkpoint Operations
2161    // ======================
2162
2163    /// Create a checkpoint.
2164    ///
2165    /// # Errors
2166    ///
2167    /// Returns an error if the insert fails.
2168    #[allow(clippy::too_many_arguments)]
2169    pub fn create_checkpoint(
2170        &mut self,
2171        id: &str,
2172        session_id: &str,
2173        name: &str,
2174        description: Option<&str>,
2175        git_status: Option<&str>,
2176        git_branch: Option<&str>,
2177        actor: &str,
2178    ) -> Result<()> {
2179        let now = chrono::Utc::now().timestamp_millis();
2180
2181        self.mutate("create_checkpoint", actor, |tx, ctx| {
2182            tx.execute(
2183                "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
2184                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2185                rusqlite::params![id, session_id, name, description, git_status, git_branch, now],
2186            )?;
2187
2188            ctx.record_event("checkpoint", id, EventType::CheckpointCreated);
2189
2190            Ok(())
2191        })
2192    }
2193
2194    /// Add an item to a checkpoint.
2195    ///
2196    /// # Errors
2197    ///
2198    /// Returns an error if the insert fails.
2199    pub fn add_checkpoint_item(
2200        &mut self,
2201        checkpoint_id: &str,
2202        context_item_id: &str,
2203        actor: &str,
2204    ) -> Result<()> {
2205        let id = format!("cpitem_{}", &uuid::Uuid::new_v4().to_string()[..12]);
2206        self.mutate("add_checkpoint_item", actor, |tx, _ctx| {
2207            tx.execute(
2208                "INSERT OR IGNORE INTO checkpoint_items (id, checkpoint_id, context_item_id)
2209                 VALUES (?1, ?2, ?3)",
2210                rusqlite::params![id, checkpoint_id, context_item_id],
2211            )?;
2212
2213            Ok(())
2214        })
2215    }
2216
2217    /// Count context items created since the most recent checkpoint for a session.
2218    ///
2219    /// Returns 0 if no items exist. If no checkpoint exists, counts all items.
2220    ///
2221    /// # Errors
2222    ///
2223    /// Returns an error if the query fails.
2224    pub fn count_items_since_last_checkpoint(&self, session_id: &str) -> Result<i64> {
2225        let last_checkpoint_time: Option<i64> = self.conn.query_row(
2226            "SELECT MAX(created_at) FROM checkpoints WHERE session_id = ?1",
2227            [session_id],
2228            |row| row.get(0),
2229        )?;
2230
2231        let count = if let Some(ts) = last_checkpoint_time {
2232            self.conn.query_row(
2233                "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND created_at > ?2",
2234                rusqlite::params![session_id, ts],
2235                |row| row.get(0),
2236            )?
2237        } else {
2238            // No checkpoints yet — count all items
2239            self.conn.query_row(
2240                "SELECT COUNT(*) FROM context_items WHERE session_id = ?1",
2241                [session_id],
2242                |row| row.get(0),
2243            )?
2244        };
2245
2246        Ok(count)
2247    }
2248
2249    /// List checkpoints for a session.
2250    ///
2251    /// # Errors
2252    ///
2253    /// Returns an error if the query fails.
2254    pub fn list_checkpoints(
2255        &self,
2256        session_id: &str,
2257        limit: Option<u32>,
2258    ) -> Result<Vec<Checkpoint>> {
2259        let limit = limit.unwrap_or(20);
2260
2261        let mut stmt = self.conn.prepare(
2262            "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2263                    (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2264             FROM checkpoints c
2265             WHERE c.session_id = ?1
2266             ORDER BY c.created_at DESC
2267             LIMIT ?2",
2268        )?;
2269
2270        let rows = stmt.query_map(rusqlite::params![session_id, limit], |row| {
2271            Ok(Checkpoint {
2272                id: row.get(0)?,
2273                session_id: row.get(1)?,
2274                name: row.get(2)?,
2275                description: row.get(3)?,
2276                git_status: row.get(4)?,
2277                git_branch: row.get(5)?,
2278                created_at: row.get(6)?,
2279                item_count: row.get(7)?,
2280            })
2281        })?;
2282
2283        rows.collect::<std::result::Result<Vec<_>, _>>()
2284            .map_err(Error::from)
2285    }
2286
2287    /// Get a checkpoint by ID.
2288    ///
2289    /// # Errors
2290    ///
2291    /// Returns an error if the query fails.
2292    pub fn get_checkpoint(&self, id: &str) -> Result<Option<Checkpoint>> {
2293        let mut stmt = self.conn.prepare(
2294            "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2295                    (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2296             FROM checkpoints c
2297             WHERE c.id = ?1",
2298        )?;
2299
2300        let checkpoint = stmt
2301            .query_row([id], |row| {
2302                Ok(Checkpoint {
2303                    id: row.get(0)?,
2304                    session_id: row.get(1)?,
2305                    name: row.get(2)?,
2306                    description: row.get(3)?,
2307                    git_status: row.get(4)?,
2308                    git_branch: row.get(5)?,
2309                    created_at: row.get(6)?,
2310                    item_count: row.get(7)?,
2311                })
2312            })
2313            .optional()?;
2314
2315        Ok(checkpoint)
2316    }
2317
2318    /// Delete a checkpoint.
2319    ///
2320    /// # Errors
2321    ///
2322    /// Returns an error if the delete fails.
2323    pub fn delete_checkpoint(&mut self, id: &str, actor: &str) -> Result<()> {
2324        self.mutate("delete_checkpoint", actor, |tx, ctx| {
2325            // Get project_path from session for sync tracking
2326            let project_path: Option<Option<String>> = tx
2327                .query_row(
2328                    "SELECT s.project_path FROM checkpoints c
2329                     JOIN sessions s ON c.session_id = s.id
2330                     WHERE c.id = ?1",
2331                    [id],
2332                    |row| row.get(0),
2333                )
2334                .optional()?;
2335
2336            // Delete checkpoint items first
2337            tx.execute("DELETE FROM checkpoint_items WHERE checkpoint_id = ?1", [id])?;
2338
2339            // Delete the checkpoint
2340            let rows = tx.execute("DELETE FROM checkpoints WHERE id = ?1", [id])?;
2341
2342            if rows == 0 {
2343                return Err(Error::CheckpointNotFound { id: id.to_string() });
2344            }
2345
2346            ctx.record_event("checkpoint", id, EventType::CheckpointDeleted);
2347
2348            // Record for sync export
2349            if let Some(Some(path)) = project_path {
2350                let now = chrono::Utc::now().timestamp_millis();
2351                tx.execute(
2352                    "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2353                     VALUES ('checkpoint', ?1, ?2, ?3, ?4, 0)
2354                     ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2355                       deleted_at = excluded.deleted_at,
2356                       deleted_by = excluded.deleted_by,
2357                       exported = 0",
2358                    rusqlite::params![id, path, now, ctx.actor],
2359                )?;
2360            }
2361
2362            Ok(())
2363        })
2364    }
2365
2366    /// Get context items in a checkpoint.
2367    ///
2368    /// # Errors
2369    ///
2370    /// Returns an error if the query fails.
2371    pub fn get_checkpoint_items(&self, checkpoint_id: &str) -> Result<Vec<ContextItem>> {
2372        let mut stmt = self.conn.prepare(
2373            "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority,
2374                    ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
2375             FROM context_items ci
2376             JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2377             WHERE cpi.checkpoint_id = ?1
2378             ORDER BY ci.priority DESC, ci.created_at DESC",
2379        )?;
2380
2381        let rows = stmt.query_map([checkpoint_id], |row| {
2382            Ok(ContextItem {
2383                id: row.get(0)?,
2384                session_id: row.get(1)?,
2385                key: row.get(2)?,
2386                value: row.get(3)?,
2387                category: row.get(4)?,
2388                priority: row.get(5)?,
2389                channel: row.get(6)?,
2390                tags: row.get(7)?,
2391                size: row.get(8)?,
2392                created_at: row.get(9)?,
2393                updated_at: row.get(10)?,
2394            })
2395        })?;
2396
2397        rows.collect::<std::result::Result<Vec<_>, _>>()
2398            .map_err(Error::from)
2399    }
2400
2401    /// Restore a checkpoint to a target session.
2402    ///
2403    /// This clears existing context items in the target session and recreates
2404    /// them from the checkpoint. Optional filters can limit which items are restored.
2405    ///
2406    /// # Errors
2407    ///
2408    /// Returns an error if the restore fails.
2409    pub fn restore_checkpoint(
2410        &mut self,
2411        checkpoint_id: &str,
2412        target_session_id: &str,
2413        restore_categories: Option<&[String]>,
2414        restore_tags: Option<&[String]>,
2415        actor: &str,
2416    ) -> Result<usize> {
2417        // Get items from checkpoint
2418        let mut items = self.get_checkpoint_items(checkpoint_id)?;
2419
2420        // Apply category filter
2421        if let Some(categories) = restore_categories {
2422            items.retain(|item| categories.contains(&item.category));
2423        }
2424
2425        // Apply tag filter
2426        if let Some(tags) = restore_tags {
2427            items.retain(|item| {
2428                // Parse tags from item (stored as JSON array or null)
2429                if let Some(ref item_tags) = item.tags {
2430                    if let Ok(parsed_tags) = serde_json::from_str::<Vec<String>>(item_tags) {
2431                        return tags.iter().any(|t| parsed_tags.contains(t));
2432                    }
2433                }
2434                false
2435            });
2436        }
2437
2438        let now = chrono::Utc::now().timestamp_millis();
2439
2440        self.mutate("restore_checkpoint", actor, |tx, ctx| {
2441            // Clear existing context items in target session
2442            tx.execute(
2443                "DELETE FROM context_items WHERE session_id = ?1",
2444                [target_session_id],
2445            )?;
2446
2447            // Restore items
2448            let mut restored = 0;
2449            for item in &items {
2450                let new_id = uuid::Uuid::new_v4().to_string();
2451                let size = item.value.len() as i64;
2452
2453                tx.execute(
2454                    "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
2455                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)",
2456                    rusqlite::params![
2457                        new_id,
2458                        target_session_id,
2459                        item.key,
2460                        item.value,
2461                        item.category,
2462                        item.priority,
2463                        item.channel,
2464                        item.tags,
2465                        size,
2466                        now,
2467                    ],
2468                )?;
2469
2470                ctx.record_event("context_item", &new_id, EventType::ItemCreated);
2471                restored += 1;
2472            }
2473
2474            Ok(restored)
2475        })
2476    }
2477
2478    /// Remove an item from a checkpoint by context item ID.
2479    ///
2480    /// # Errors
2481    ///
2482    /// Returns an error if the delete fails.
2483    pub fn remove_checkpoint_item(
2484        &mut self,
2485        checkpoint_id: &str,
2486        context_item_id: &str,
2487        actor: &str,
2488    ) -> Result<()> {
2489        self.mutate("remove_checkpoint_item", actor, |tx, _ctx| {
2490            tx.execute(
2491                "DELETE FROM checkpoint_items WHERE checkpoint_id = ?1 AND context_item_id = ?2",
2492                rusqlite::params![checkpoint_id, context_item_id],
2493            )?;
2494            Ok(())
2495        })
2496    }
2497
2498    /// Add items to checkpoint by their keys (from current session).
2499    ///
2500    /// # Errors
2501    ///
2502    /// Returns an error if the operation fails.
2503    pub fn add_checkpoint_items_by_keys(
2504        &mut self,
2505        checkpoint_id: &str,
2506        session_id: &str,
2507        keys: &[String],
2508        actor: &str,
2509    ) -> Result<usize> {
2510        let mut added = 0;
2511
2512        for key in keys {
2513            // Find context item by key in session
2514            let item_id: Option<String> = self.conn.query_row(
2515                "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
2516                rusqlite::params![session_id, key],
2517                |row| row.get(0),
2518            ).optional()?;
2519
2520            if let Some(id) = item_id {
2521                self.add_checkpoint_item(checkpoint_id, &id, actor)?;
2522                added += 1;
2523            }
2524        }
2525
2526        Ok(added)
2527    }
2528
2529    /// Remove items from checkpoint by their keys.
2530    ///
2531    /// # Errors
2532    ///
2533    /// Returns an error if the operation fails.
2534    pub fn remove_checkpoint_items_by_keys(
2535        &mut self,
2536        checkpoint_id: &str,
2537        keys: &[String],
2538        actor: &str,
2539    ) -> Result<usize> {
2540        let mut removed = 0;
2541
2542        for key in keys {
2543            // Find context item by key in checkpoint
2544            let item_id: Option<String> = self.conn.query_row(
2545                "SELECT ci.id FROM context_items ci
2546                 JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2547                 WHERE cpi.checkpoint_id = ?1 AND ci.key = ?2",
2548                rusqlite::params![checkpoint_id, key],
2549                |row| row.get(0),
2550            ).optional()?;
2551
2552            if let Some(id) = item_id {
2553                self.remove_checkpoint_item(checkpoint_id, &id, actor)?;
2554                removed += 1;
2555            }
2556        }
2557
2558        Ok(removed)
2559    }
2560
2561    // =================
2562    // Memory Operations
2563    // =================
2564
2565    /// Save a memory item (project-level persistent storage).
2566    ///
2567    /// # Errors
2568    ///
2569    /// Returns an error if the operation fails.
2570    #[allow(clippy::too_many_arguments)]
2571    pub fn save_memory(
2572        &mut self,
2573        id: &str,
2574        project_path: &str,
2575        key: &str,
2576        value: &str,
2577        category: &str,
2578        actor: &str,
2579    ) -> Result<()> {
2580        let now = chrono::Utc::now().timestamp_millis();
2581
2582        self.mutate("save_memory", actor, |tx, ctx| {
2583            tx.execute(
2584                "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
2585                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
2586                 ON CONFLICT(project_path, key) DO UPDATE SET
2587                   value = excluded.value,
2588                   category = excluded.category,
2589                   updated_at = excluded.updated_at",
2590                rusqlite::params![id, project_path, key, value, category, now],
2591            )?;
2592
2593            ctx.record_event("memory", id, EventType::MemorySaved);
2594
2595            Ok(())
2596        })
2597    }
2598
2599    /// Get a memory item by key.
2600    ///
2601    /// # Errors
2602    ///
2603    /// Returns an error if the query fails.
2604    pub fn get_memory(&self, project_path: &str, key: &str) -> Result<Option<Memory>> {
2605        let mut stmt = self.conn.prepare(
2606            "SELECT id, project_path, key, value, category, created_at, updated_at
2607             FROM project_memory WHERE project_path = ?1 AND key = ?2",
2608        )?;
2609
2610        let memory = stmt
2611            .query_row(rusqlite::params![project_path, key], |row| {
2612                Ok(Memory {
2613                    id: row.get(0)?,
2614                    project_path: row.get(1)?,
2615                    key: row.get(2)?,
2616                    value: row.get(3)?,
2617                    category: row.get(4)?,
2618                    created_at: row.get(5)?,
2619                    updated_at: row.get(6)?,
2620                })
2621            })
2622            .optional()?;
2623
2624        Ok(memory)
2625    }
2626
2627    /// List memory items for a project.
2628    ///
2629    /// # Errors
2630    ///
2631    /// Returns an error if the query fails.
2632    pub fn list_memory(
2633        &self,
2634        project_path: &str,
2635        category: Option<&str>,
2636    ) -> Result<Vec<Memory>> {
2637        let map_row = |row: &rusqlite::Row| -> rusqlite::Result<Memory> {
2638            Ok(Memory {
2639                id: row.get(0)?,
2640                project_path: row.get(1)?,
2641                key: row.get(2)?,
2642                value: row.get(3)?,
2643                category: row.get(4)?,
2644                created_at: row.get(5)?,
2645                updated_at: row.get(6)?,
2646            })
2647        };
2648
2649        let rows = if let Some(cat) = category {
2650            let mut stmt = self.conn.prepare(
2651                "SELECT id, project_path, key, value, category, created_at, updated_at
2652                 FROM project_memory WHERE project_path = ?1 AND category = ?2
2653                 ORDER BY key ASC",
2654            )?;
2655            stmt.query_map(rusqlite::params![project_path, cat], map_row)?
2656                .collect::<std::result::Result<Vec<_>, _>>()
2657        } else {
2658            let mut stmt = self.conn.prepare(
2659                "SELECT id, project_path, key, value, category, created_at, updated_at
2660                 FROM project_memory WHERE project_path = ?1
2661                 ORDER BY key ASC",
2662            )?;
2663            stmt.query_map(rusqlite::params![project_path], map_row)?
2664                .collect::<std::result::Result<Vec<_>, _>>()
2665        };
2666
2667        rows.map_err(Error::from)
2668    }
2669
2670    /// Delete a memory item.
2671    ///
2672    /// # Errors
2673    ///
2674    /// Returns an error if the delete fails.
2675    pub fn delete_memory(
2676        &mut self,
2677        project_path: &str,
2678        key: &str,
2679        actor: &str,
2680    ) -> Result<()> {
2681        let proj_path = project_path.to_string();
2682        self.mutate("delete_memory", actor, |tx, ctx| {
2683            // Get ID for event
2684            let id: Option<String> = tx
2685                .query_row(
2686                    "SELECT id FROM project_memory WHERE project_path = ?1 AND key = ?2",
2687                    rusqlite::params![proj_path, key],
2688                    |row| row.get(0),
2689                )
2690                .optional()?;
2691
2692            let rows = tx.execute(
2693                "DELETE FROM project_memory WHERE project_path = ?1 AND key = ?2",
2694                rusqlite::params![proj_path, key],
2695            )?;
2696
2697            if rows > 0 {
2698                if let Some(ref mem_id) = id {
2699                    ctx.record_event("memory", mem_id, EventType::MemoryDeleted);
2700
2701                    // Record for sync export
2702                    let now = chrono::Utc::now().timestamp_millis();
2703                    tx.execute(
2704                        "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2705                         VALUES ('memory', ?1, ?2, ?3, ?4, 0)
2706                         ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2707                           deleted_at = excluded.deleted_at,
2708                           deleted_by = excluded.deleted_by,
2709                           exported = 0",
2710                        rusqlite::params![mem_id, proj_path, now, ctx.actor],
2711                    )?;
2712                }
2713            }
2714
2715            Ok(())
2716        })
2717    }
2718
2719    // =======================
2720    // Sync Support Operations
2721    // =======================
2722
2723    /// Get IDs of all dirty sessions (pending export).
2724    ///
2725    /// # Errors
2726    ///
2727    /// Returns an error if the query fails.
2728    pub fn get_dirty_sessions(&self) -> Result<Vec<String>> {
2729        let mut stmt = self.conn.prepare(
2730            "SELECT session_id FROM dirty_sessions ORDER BY marked_at ASC",
2731        )?;
2732        let rows = stmt.query_map([], |row| row.get(0))?;
2733        rows.collect::<std::result::Result<Vec<_>, _>>()
2734            .map_err(Error::from)
2735    }
2736
2737    /// Get IDs of all dirty issues (pending export).
2738    ///
2739    /// # Errors
2740    ///
2741    /// Returns an error if the query fails.
2742    pub fn get_dirty_issues(&self) -> Result<Vec<String>> {
2743        let mut stmt = self.conn.prepare(
2744            "SELECT issue_id FROM dirty_issues ORDER BY marked_at ASC",
2745        )?;
2746        let rows = stmt.query_map([], |row| row.get(0))?;
2747        rows.collect::<std::result::Result<Vec<_>, _>>()
2748            .map_err(Error::from)
2749    }
2750
2751    /// Get IDs of all dirty context items (pending export).
2752    ///
2753    /// # Errors
2754    ///
2755    /// Returns an error if the query fails.
2756    pub fn get_dirty_context_items(&self) -> Result<Vec<String>> {
2757        let mut stmt = self.conn.prepare(
2758            "SELECT item_id FROM dirty_context_items ORDER BY marked_at ASC",
2759        )?;
2760        let rows = stmt.query_map([], |row| row.get(0))?;
2761        rows.collect::<std::result::Result<Vec<_>, _>>()
2762            .map_err(Error::from)
2763    }
2764
2765    /// Clear dirty flags for sessions after successful export.
2766    ///
2767    /// # Errors
2768    ///
2769    /// Returns an error if the delete fails.
2770    pub fn clear_dirty_sessions(&mut self, ids: &[String]) -> Result<()> {
2771        if ids.is_empty() {
2772            return Ok(());
2773        }
2774        let placeholders = vec!["?"; ids.len()].join(",");
2775        let sql = format!("DELETE FROM dirty_sessions WHERE session_id IN ({placeholders})");
2776        let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2777        self.conn.execute(&sql, params.as_slice())?;
2778        Ok(())
2779    }
2780
2781    /// Clear dirty flags for issues after successful export.
2782    ///
2783    /// # Errors
2784    ///
2785    /// Returns an error if the delete fails.
2786    pub fn clear_dirty_issues(&mut self, ids: &[String]) -> Result<()> {
2787        if ids.is_empty() {
2788            return Ok(());
2789        }
2790        let placeholders = vec!["?"; ids.len()].join(",");
2791        let sql = format!("DELETE FROM dirty_issues WHERE issue_id IN ({placeholders})");
2792        let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2793        self.conn.execute(&sql, params.as_slice())?;
2794        Ok(())
2795    }
2796
2797    /// Clear dirty flags for context items after successful export.
2798    ///
2799    /// # Errors
2800    ///
2801    /// Returns an error if the delete fails.
2802    pub fn clear_dirty_context_items(&mut self, ids: &[String]) -> Result<()> {
2803        if ids.is_empty() {
2804            return Ok(());
2805        }
2806        let placeholders = vec!["?"; ids.len()].join(",");
2807        let sql = format!("DELETE FROM dirty_context_items WHERE item_id IN ({placeholders})");
2808        let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2809        self.conn.execute(&sql, params.as_slice())?;
2810        Ok(())
2811    }
2812
2813    /// Get the stored content hash for an entity (for incremental export).
2814    ///
2815    /// # Errors
2816    ///
2817    /// Returns an error if the query fails.
2818    pub fn get_export_hash(&self, entity_type: &str, entity_id: &str) -> Result<Option<String>> {
2819        let mut stmt = self.conn.prepare(
2820            "SELECT content_hash FROM export_hashes WHERE entity_type = ?1 AND entity_id = ?2",
2821        )?;
2822        let hash = stmt
2823            .query_row(rusqlite::params![entity_type, entity_id], |row| row.get(0))
2824            .optional()?;
2825        Ok(hash)
2826    }
2827
2828    /// Store a content hash after successful export.
2829    ///
2830    /// # Errors
2831    ///
2832    /// Returns an error if the upsert fails.
2833    pub fn set_export_hash(&mut self, entity_type: &str, entity_id: &str, hash: &str) -> Result<()> {
2834        let now = chrono::Utc::now().timestamp_millis();
2835        self.conn.execute(
2836            "INSERT INTO export_hashes (entity_type, entity_id, content_hash, exported_at)
2837             VALUES (?1, ?2, ?3, ?4)
2838             ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2839               content_hash = excluded.content_hash,
2840               exported_at = excluded.exported_at",
2841            rusqlite::params![entity_type, entity_id, hash, now],
2842        )?;
2843        Ok(())
2844    }
2845
2846    // ===================
2847    // Deletion Tracking (for sync)
2848    // ===================
2849
2850    /// Record a deletion for sync export.
2851    ///
2852    /// This should be called when an entity is deleted so that the deletion
2853    /// can be exported and applied on other machines.
2854    ///
2855    /// # Errors
2856    ///
2857    /// Returns an error if the insert fails.
2858    pub fn record_deletion(
2859        &mut self,
2860        entity_type: &str,
2861        entity_id: &str,
2862        project_path: &str,
2863        actor: &str,
2864    ) -> Result<()> {
2865        let now = chrono::Utc::now().timestamp_millis();
2866        self.conn.execute(
2867            "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2868             VALUES (?1, ?2, ?3, ?4, ?5, 0)
2869             ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2870               deleted_at = excluded.deleted_at,
2871               deleted_by = excluded.deleted_by,
2872               exported = 0",
2873            rusqlite::params![entity_type, entity_id, project_path, now, actor],
2874        )?;
2875        Ok(())
2876    }
2877
2878    /// Get pending deletions for a project that haven't been exported yet.
2879    ///
2880    /// # Errors
2881    ///
2882    /// Returns an error if the query fails.
2883    pub fn get_pending_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2884        let mut stmt = self.conn.prepare(
2885            "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2886             FROM sync_deletions
2887             WHERE project_path = ?1 AND exported = 0
2888             ORDER BY deleted_at ASC",
2889        )?;
2890        let rows = stmt.query_map([project_path], |row| {
2891            Ok(SyncDeletion {
2892                id: row.get(0)?,
2893                entity_type: row.get(1)?,
2894                entity_id: row.get(2)?,
2895                project_path: row.get(3)?,
2896                deleted_at: row.get(4)?,
2897                deleted_by: row.get(5)?,
2898            })
2899        })?;
2900        rows.collect::<std::result::Result<Vec<_>, _>>()
2901            .map_err(Error::from)
2902    }
2903
2904    /// Get all deletions for a project (for full export).
2905    ///
2906    /// # Errors
2907    ///
2908    /// Returns an error if the query fails.
2909    pub fn get_all_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2910        let mut stmt = self.conn.prepare(
2911            "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2912             FROM sync_deletions
2913             WHERE project_path = ?1
2914             ORDER BY deleted_at ASC",
2915        )?;
2916        let rows = stmt.query_map([project_path], |row| {
2917            Ok(SyncDeletion {
2918                id: row.get(0)?,
2919                entity_type: row.get(1)?,
2920                entity_id: row.get(2)?,
2921                project_path: row.get(3)?,
2922                deleted_at: row.get(4)?,
2923                deleted_by: row.get(5)?,
2924            })
2925        })?;
2926        rows.collect::<std::result::Result<Vec<_>, _>>()
2927            .map_err(Error::from)
2928    }
2929
2930    /// Mark deletions as exported.
2931    ///
2932    /// # Errors
2933    ///
2934    /// Returns an error if the update fails.
2935    pub fn mark_deletions_exported(&mut self, ids: &[i64]) -> Result<()> {
2936        if ids.is_empty() {
2937            return Ok(());
2938        }
2939        let placeholders = vec!["?"; ids.len()].join(",");
2940        let sql = format!("UPDATE sync_deletions SET exported = 1 WHERE id IN ({placeholders})");
2941        let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2942        self.conn.execute(&sql, params.as_slice())?;
2943        Ok(())
2944    }
2945
2946    /// Count pending deletions for a project.
2947    ///
2948    /// # Errors
2949    ///
2950    /// Returns an error if the query fails.
2951    pub fn count_pending_deletions(&self, project_path: &str) -> Result<usize> {
2952        let count: i64 = self.conn.query_row(
2953            "SELECT COUNT(*) FROM sync_deletions WHERE project_path = ?1 AND exported = 0",
2954            [project_path],
2955            |row| row.get(0),
2956        )?;
2957        Ok(count as usize)
2958    }
2959
2960    /// Delete entity by ID for import (applies deletion from another machine).
2961    ///
2962    /// # Errors
2963    ///
2964    /// Returns an error if the delete fails.
2965    pub fn apply_deletion(&mut self, entity_type: &str, entity_id: &str) -> Result<bool> {
2966        let sql = match entity_type {
2967            "session" => "DELETE FROM sessions WHERE id = ?1",
2968            "issue" => "DELETE FROM issues WHERE id = ?1",
2969            "context_item" => "DELETE FROM context_items WHERE id = ?1",
2970            "memory" => "DELETE FROM project_memory WHERE id = ?1",
2971            "checkpoint" => "DELETE FROM checkpoints WHERE id = ?1",
2972            _ => return Ok(false),
2973        };
2974        let rows = self.conn.execute(sql, [entity_id])?;
2975        Ok(rows > 0)
2976    }
2977
2978    /// Get all sessions (for full export).
2979    ///
2980    /// # Errors
2981    ///
2982    /// Returns an error if the query fails.
2983    pub fn get_all_sessions(&self) -> Result<Vec<Session>> {
2984        let mut stmt = self.conn.prepare(
2985            "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
2986             FROM sessions ORDER BY created_at ASC",
2987        )?;
2988        let rows = stmt.query_map([], |row| {
2989            Ok(Session {
2990                id: row.get(0)?,
2991                name: row.get(1)?,
2992                description: row.get(2)?,
2993                branch: row.get(3)?,
2994                channel: row.get(4)?,
2995                project_path: row.get(5)?,
2996                status: row.get(6)?,
2997                ended_at: row.get(7)?,
2998                created_at: row.get(8)?,
2999                updated_at: row.get(9)?,
3000            })
3001        })?;
3002        rows.collect::<std::result::Result<Vec<_>, _>>()
3003            .map_err(Error::from)
3004    }
3005
3006    /// Get all issues (for full export).
3007    ///
3008    /// # Errors
3009    ///
3010    /// Returns an error if the query fails.
3011    pub fn get_all_issues(&self) -> Result<Vec<Issue>> {
3012        let mut stmt = self.conn.prepare(
3013            "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
3014             FROM issues ORDER BY created_at ASC",
3015        )?;
3016        let rows = stmt.query_map([], map_issue_row)?;
3017        rows.collect::<std::result::Result<Vec<_>, _>>()
3018            .map_err(Error::from)
3019    }
3020
3021    /// Get all context items (for full export).
3022    ///
3023    /// # Errors
3024    ///
3025    /// Returns an error if the query fails.
3026    pub fn get_all_context_items(
3027        &self,
3028        category: Option<&str>,
3029        priority: Option<&str>,
3030        limit: Option<u32>,
3031    ) -> Result<Vec<ContextItem>> {
3032        let mut sql = String::from(
3033            "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3034             FROM context_items WHERE 1=1",
3035        );
3036
3037        let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
3038
3039        if let Some(cat) = category {
3040            sql.push_str(" AND category = ?");
3041            params.push(Box::new(cat.to_string()));
3042        }
3043
3044        if let Some(pri) = priority {
3045            sql.push_str(" AND priority = ?");
3046            params.push(Box::new(pri.to_string()));
3047        }
3048
3049        sql.push_str(" ORDER BY created_at DESC");
3050        if let Some(lim) = limit {
3051            sql.push_str(" LIMIT ?");
3052            params.push(Box::new(lim));
3053        }
3054
3055        let mut stmt = self.conn.prepare(&sql)?;
3056        let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
3057
3058        let rows = stmt.query_map(params_refs.as_slice(), |row| {
3059            Ok(ContextItem {
3060                id: row.get(0)?,
3061                session_id: row.get(1)?,
3062                key: row.get(2)?,
3063                value: row.get(3)?,
3064                category: row.get(4)?,
3065                priority: row.get(5)?,
3066                channel: row.get(6)?,
3067                tags: row.get(7)?,
3068                size: row.get(8)?,
3069                created_at: row.get(9)?,
3070                updated_at: row.get(10)?,
3071            })
3072        })?;
3073        rows.collect::<std::result::Result<Vec<_>, _>>()
3074            .map_err(Error::from)
3075    }
3076
3077    /// Get all memory items (for full export).
3078    ///
3079    /// # Errors
3080    ///
3081    /// Returns an error if the query fails.
3082    pub fn get_all_memory(&self) -> Result<Vec<Memory>> {
3083        let mut stmt = self.conn.prepare(
3084            "SELECT id, project_path, key, value, category, created_at, updated_at
3085             FROM project_memory ORDER BY created_at ASC",
3086        )?;
3087        let rows = stmt.query_map([], |row| {
3088            Ok(Memory {
3089                id: row.get(0)?,
3090                project_path: row.get(1)?,
3091                key: row.get(2)?,
3092                value: row.get(3)?,
3093                category: row.get(4)?,
3094                created_at: row.get(5)?,
3095                updated_at: row.get(6)?,
3096            })
3097        })?;
3098        rows.collect::<std::result::Result<Vec<_>, _>>()
3099            .map_err(Error::from)
3100    }
3101
3102    /// Get all issue short IDs (for Levenshtein suggestions).
3103    ///
3104    /// Returns short_ids (e.g. "SC-a1b2") for all issues, used by
3105    /// `find_similar_ids()` when an issue lookup fails.
3106    pub fn get_all_issue_short_ids(&self) -> Result<Vec<String>> {
3107        let mut stmt = self
3108            .conn
3109            .prepare("SELECT short_id FROM issues WHERE short_id IS NOT NULL")?;
3110        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3111        rows.collect::<std::result::Result<Vec<_>, _>>()
3112            .map_err(Error::from)
3113    }
3114
3115    /// Get all session IDs (for Levenshtein suggestions).
3116    pub fn get_all_session_ids(&self) -> Result<Vec<String>> {
3117        let mut stmt = self.conn.prepare("SELECT id FROM sessions")?;
3118        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3119        rows.collect::<std::result::Result<Vec<_>, _>>()
3120            .map_err(Error::from)
3121    }
3122
3123    /// Get all checkpoint IDs (for Levenshtein suggestions).
3124    pub fn get_all_checkpoint_ids(&self) -> Result<Vec<String>> {
3125        let mut stmt = self.conn.prepare("SELECT id FROM checkpoints")?;
3126        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3127        rows.collect::<std::result::Result<Vec<_>, _>>()
3128            .map_err(Error::from)
3129    }
3130
3131    /// Get all checkpoints (for full export).
3132    ///
3133    /// # Errors
3134    ///
3135    /// Returns an error if the query fails.
3136    pub fn get_all_checkpoints(&self) -> Result<Vec<Checkpoint>> {
3137        let mut stmt = self.conn.prepare(
3138            "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
3139                    (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
3140             FROM checkpoints c ORDER BY c.created_at ASC",
3141        )?;
3142        let rows = stmt.query_map([], |row| {
3143            Ok(Checkpoint {
3144                id: row.get(0)?,
3145                session_id: row.get(1)?,
3146                name: row.get(2)?,
3147                description: row.get(3)?,
3148                git_status: row.get(4)?,
3149                git_branch: row.get(5)?,
3150                created_at: row.get(6)?,
3151                item_count: row.get(7)?,
3152            })
3153        })?;
3154        rows.collect::<std::result::Result<Vec<_>, _>>()
3155            .map_err(Error::from)
3156    }
3157
3158    /// Get a context item by ID.
3159    ///
3160    /// # Errors
3161    ///
3162    /// Returns an error if the query fails.
3163    pub fn get_context_item(&self, id: &str) -> Result<Option<ContextItem>> {
3164        let mut stmt = self.conn.prepare(
3165            "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3166             FROM context_items WHERE id = ?1",
3167        )?;
3168        let item = stmt
3169            .query_row([id], |row| {
3170                Ok(ContextItem {
3171                    id: row.get(0)?,
3172                    session_id: row.get(1)?,
3173                    key: row.get(2)?,
3174                    value: row.get(3)?,
3175                    category: row.get(4)?,
3176                    priority: row.get(5)?,
3177                    channel: row.get(6)?,
3178                    tags: row.get(7)?,
3179                    size: row.get(8)?,
3180                    created_at: row.get(9)?,
3181                    updated_at: row.get(10)?,
3182                })
3183            })
3184            .optional()?;
3185        Ok(item)
3186    }
3187
3188    // ======================
3189    // Project-Scoped Queries (for sync export)
3190    // ======================
3191
3192    /// Get all sessions for a specific project.
3193    ///
3194    /// # Errors
3195    ///
3196    /// Returns an error if the query fails.
3197    pub fn get_sessions_by_project(&self, project_path: &str) -> Result<Vec<Session>> {
3198        let mut stmt = self.conn.prepare(
3199            "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
3200             FROM sessions WHERE project_path = ?1 ORDER BY created_at ASC",
3201        )?;
3202        let rows = stmt.query_map([project_path], |row| {
3203            Ok(Session {
3204                id: row.get(0)?,
3205                name: row.get(1)?,
3206                description: row.get(2)?,
3207                branch: row.get(3)?,
3208                channel: row.get(4)?,
3209                project_path: row.get(5)?,
3210                status: row.get(6)?,
3211                ended_at: row.get(7)?,
3212                created_at: row.get(8)?,
3213                updated_at: row.get(9)?,
3214            })
3215        })?;
3216        rows.collect::<std::result::Result<Vec<_>, _>>()
3217            .map_err(Error::from)
3218    }
3219
3220    /// Get all issues for a specific project.
3221    ///
3222    /// # Errors
3223    ///
3224    /// Returns an error if the query fails.
3225    pub fn get_issues_by_project(&self, project_path: &str) -> Result<Vec<Issue>> {
3226        let mut stmt = self.conn.prepare(
3227            "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
3228             FROM issues WHERE project_path = ?1 ORDER BY created_at ASC",
3229        )?;
3230        let rows = stmt.query_map([project_path], map_issue_row)?;
3231        rows.collect::<std::result::Result<Vec<_>, _>>()
3232            .map_err(Error::from)
3233    }
3234
3235    /// Get all context items for sessions in a specific project.
3236    ///
3237    /// Context items are linked to sessions, so we join on session_id
3238    /// and filter by the session's project_path.
3239    ///
3240    /// # Errors
3241    ///
3242    /// Returns an error if the query fails.
3243    pub fn get_context_items_by_project(&self, project_path: &str) -> Result<Vec<ContextItem>> {
3244        let mut stmt = self.conn.prepare(
3245            "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority, ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
3246             FROM context_items ci
3247             INNER JOIN sessions s ON ci.session_id = s.id
3248             WHERE s.project_path = ?1
3249             ORDER BY ci.created_at ASC",
3250        )?;
3251        let rows = stmt.query_map([project_path], |row| {
3252            Ok(ContextItem {
3253                id: row.get(0)?,
3254                session_id: row.get(1)?,
3255                key: row.get(2)?,
3256                value: row.get(3)?,
3257                category: row.get(4)?,
3258                priority: row.get(5)?,
3259                channel: row.get(6)?,
3260                tags: row.get(7)?,
3261                size: row.get(8)?,
3262                created_at: row.get(9)?,
3263                updated_at: row.get(10)?,
3264            })
3265        })?;
3266        rows.collect::<std::result::Result<Vec<_>, _>>()
3267            .map_err(Error::from)
3268    }
3269
3270    /// Get all memory items for a specific project.
3271    ///
3272    /// # Errors
3273    ///
3274    /// Returns an error if the query fails.
3275    pub fn get_memory_by_project(&self, project_path: &str) -> Result<Vec<Memory>> {
3276        let mut stmt = self.conn.prepare(
3277            "SELECT id, project_path, key, value, category, created_at, updated_at
3278             FROM project_memory WHERE project_path = ?1 ORDER BY created_at ASC",
3279        )?;
3280        let rows = stmt.query_map([project_path], |row| {
3281            Ok(Memory {
3282                id: row.get(0)?,
3283                project_path: row.get(1)?,
3284                key: row.get(2)?,
3285                value: row.get(3)?,
3286                category: row.get(4)?,
3287                created_at: row.get(5)?,
3288                updated_at: row.get(6)?,
3289            })
3290        })?;
3291        rows.collect::<std::result::Result<Vec<_>, _>>()
3292            .map_err(Error::from)
3293    }
3294
3295    /// Get all checkpoints for sessions in a specific project.
3296    ///
3297    /// Checkpoints are linked to sessions, so we join on session_id
3298    /// and filter by the session's project_path.
3299    ///
3300    /// # Errors
3301    ///
3302    /// Returns an error if the query fails.
3303    pub fn get_checkpoints_by_project(&self, project_path: &str) -> Result<Vec<Checkpoint>> {
3304        let mut stmt = self.conn.prepare(
3305            "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
3306                    (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
3307             FROM checkpoints c
3308             INNER JOIN sessions s ON c.session_id = s.id
3309             WHERE s.project_path = ?1
3310             ORDER BY c.created_at ASC",
3311        )?;
3312        let rows = stmt.query_map([project_path], |row| {
3313            Ok(Checkpoint {
3314                id: row.get(0)?,
3315                session_id: row.get(1)?,
3316                name: row.get(2)?,
3317                description: row.get(3)?,
3318                git_status: row.get(4)?,
3319                git_branch: row.get(5)?,
3320                created_at: row.get(6)?,
3321                item_count: row.get(7)?,
3322            })
3323        })?;
3324        rows.collect::<std::result::Result<Vec<_>, _>>()
3325            .map_err(Error::from)
3326    }
3327
3328    /// Get dirty session IDs for a specific project.
3329    ///
3330    /// # Errors
3331    ///
3332    /// Returns an error if the query fails.
3333    pub fn get_dirty_sessions_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3334        let mut stmt = self.conn.prepare(
3335            "SELECT ds.session_id
3336             FROM dirty_sessions ds
3337             INNER JOIN sessions s ON ds.session_id = s.id
3338             WHERE s.project_path = ?1",
3339        )?;
3340        let rows = stmt.query_map([project_path], |row| row.get(0))?;
3341        rows.collect::<std::result::Result<Vec<_>, _>>()
3342            .map_err(Error::from)
3343    }
3344
3345    /// Get dirty issue IDs for a specific project.
3346    ///
3347    /// # Errors
3348    ///
3349    /// Returns an error if the query fails.
3350    pub fn get_dirty_issues_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3351        let mut stmt = self.conn.prepare(
3352            "SELECT di.issue_id
3353             FROM dirty_issues di
3354             INNER JOIN issues i ON di.issue_id = i.id
3355             WHERE i.project_path = ?1",
3356        )?;
3357        let rows = stmt.query_map([project_path], |row| row.get(0))?;
3358        rows.collect::<std::result::Result<Vec<_>, _>>()
3359            .map_err(Error::from)
3360    }
3361
3362    /// Get dirty context item IDs for a specific project.
3363    ///
3364    /// # Errors
3365    ///
3366    /// Returns an error if the query fails.
3367    pub fn get_dirty_context_items_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3368        let mut stmt = self.conn.prepare(
3369            "SELECT dci.item_id
3370             FROM dirty_context_items dci
3371             INNER JOIN context_items ci ON dci.item_id = ci.id
3372             INNER JOIN sessions s ON ci.session_id = s.id
3373             WHERE s.project_path = ?1",
3374        )?;
3375        let rows = stmt.query_map([project_path], |row| row.get(0))?;
3376        rows.collect::<std::result::Result<Vec<_>, _>>()
3377            .map_err(Error::from)
3378    }
3379
3380    /// Backfill dirty tables with all records for a project.
3381    ///
3382    /// This is used on first sync export when no prior exports exist.
3383    /// It marks all existing records for the project as dirty so they
3384    /// get included in the initial export.
3385    ///
3386    /// # Errors
3387    ///
3388    /// Returns an error if the queries fail.
3389    pub fn backfill_dirty_for_project(&mut self, project_path: &str) -> Result<BackfillStats> {
3390        let now = chrono::Utc::now().timestamp_millis();
3391
3392        // Backfill sessions
3393        let sessions_count = self.conn.execute(
3394            "INSERT OR IGNORE INTO dirty_sessions (session_id, marked_at)
3395             SELECT id, ?1 FROM sessions WHERE project_path = ?2",
3396            rusqlite::params![now, project_path],
3397        )?;
3398
3399        // Backfill issues
3400        let issues_count = self.conn.execute(
3401            "INSERT OR IGNORE INTO dirty_issues (issue_id, marked_at)
3402             SELECT id, ?1 FROM issues WHERE project_path = ?2",
3403            rusqlite::params![now, project_path],
3404        )?;
3405
3406        // Backfill context items (via session join)
3407        let context_items_count = self.conn.execute(
3408            "INSERT OR IGNORE INTO dirty_context_items (item_id, marked_at)
3409             SELECT ci.id, ?1 FROM context_items ci
3410             INNER JOIN sessions s ON ci.session_id = s.id
3411             WHERE s.project_path = ?2",
3412            rusqlite::params![now, project_path],
3413        )?;
3414
3415        // Backfill plans
3416        let plans_count = self.conn.execute(
3417            "INSERT OR IGNORE INTO dirty_plans (plan_id, marked_at)
3418             SELECT id, ?1 FROM plans WHERE project_path = ?2",
3419            rusqlite::params![now, project_path],
3420        )?;
3421
3422        Ok(BackfillStats {
3423            sessions: sessions_count,
3424            issues: issues_count,
3425            context_items: context_items_count,
3426            plans: plans_count,
3427        })
3428    }
3429
3430    /// Get total record counts for a project (for status display).
3431    ///
3432    /// # Errors
3433    ///
3434    /// Returns an error if the query fails.
3435    pub fn get_project_counts(&self, project_path: &str) -> Result<ProjectCounts> {
3436        let sessions: i64 = self.conn.query_row(
3437            "SELECT COUNT(*) FROM sessions WHERE project_path = ?1",
3438            [project_path],
3439            |row| row.get(0),
3440        )?;
3441
3442        let issues: i64 = self.conn.query_row(
3443            "SELECT COUNT(*) FROM issues WHERE project_path = ?1",
3444            [project_path],
3445            |row| row.get(0),
3446        )?;
3447
3448        let context_items: i64 = self.conn.query_row(
3449            "SELECT COUNT(*) FROM context_items ci
3450             INNER JOIN sessions s ON ci.session_id = s.id
3451             WHERE s.project_path = ?1",
3452            [project_path],
3453            |row| row.get(0),
3454        )?;
3455
3456        let memories: i64 = self.conn.query_row(
3457            "SELECT COUNT(*) FROM project_memory WHERE project_path = ?1",
3458            [project_path],
3459            |row| row.get(0),
3460        )?;
3461
3462        let checkpoints: i64 = self.conn.query_row(
3463            "SELECT COUNT(*) FROM checkpoints c
3464             INNER JOIN sessions s ON c.session_id = s.id
3465             WHERE s.project_path = ?1",
3466            [project_path],
3467            |row| row.get(0),
3468        )?;
3469
3470        Ok(ProjectCounts {
3471            sessions: sessions as usize,
3472            issues: issues as usize,
3473            context_items: context_items as usize,
3474            memories: memories as usize,
3475            checkpoints: checkpoints as usize,
3476        })
3477    }
3478
3479    // ======================
3480    // Upsert Operations (for sync import)
3481    // ======================
3482
3483    /// Upsert a session (for sync import).
3484    ///
3485    /// This performs an INSERT OR REPLACE, preserving all fields from the imported record.
3486    ///
3487    /// # Errors
3488    ///
3489    /// Returns an error if the upsert fails.
3490    pub fn upsert_session(&mut self, session: &Session) -> Result<()> {
3491        self.conn.execute(
3492            "INSERT INTO sessions (id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at)
3493             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
3494             ON CONFLICT(id) DO UPDATE SET
3495               name = excluded.name,
3496               description = excluded.description,
3497               branch = excluded.branch,
3498               channel = excluded.channel,
3499               project_path = excluded.project_path,
3500               status = excluded.status,
3501               ended_at = excluded.ended_at,
3502               updated_at = excluded.updated_at",
3503            rusqlite::params![
3504                session.id,
3505                session.name,
3506                session.description,
3507                session.branch,
3508                session.channel,
3509                session.project_path,
3510                session.status,
3511                session.ended_at,
3512                session.created_at,
3513                session.updated_at,
3514            ],
3515        )?;
3516        Ok(())
3517    }
3518
3519    /// Upsert an issue (for sync import).
3520    ///
3521    /// # Errors
3522    ///
3523    /// Returns an error if the upsert fails.
3524    pub fn upsert_issue(&mut self, issue: &Issue) -> Result<()> {
3525        self.conn.execute(
3526            "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at)
3527             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
3528             ON CONFLICT(id) DO UPDATE SET
3529               short_id = excluded.short_id,
3530               project_path = excluded.project_path,
3531               title = excluded.title,
3532               description = excluded.description,
3533               details = excluded.details,
3534               status = excluded.status,
3535               priority = excluded.priority,
3536               issue_type = excluded.issue_type,
3537               plan_id = excluded.plan_id,
3538               assigned_to_agent = excluded.assigned_to_agent,
3539               updated_at = excluded.updated_at,
3540               closed_at = excluded.closed_at",
3541            rusqlite::params![
3542                issue.id,
3543                issue.short_id,
3544                issue.project_path,
3545                issue.title,
3546                issue.description,
3547                issue.details,
3548                issue.status,
3549                issue.priority,
3550                issue.issue_type,
3551                issue.plan_id,
3552                issue.created_by_agent,
3553                issue.assigned_to_agent,
3554                issue.created_at,
3555                issue.updated_at,
3556                issue.closed_at,
3557            ],
3558        )?;
3559        Ok(())
3560    }
3561
3562    /// Upsert a context item (for sync import).
3563    ///
3564    /// # Errors
3565    ///
3566    /// Returns an error if the upsert fails.
3567    pub fn upsert_context_item(&mut self, item: &ContextItem) -> Result<()> {
3568        self.conn.execute(
3569            "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
3570             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3571             ON CONFLICT(id) DO UPDATE SET
3572               key = excluded.key,
3573               value = excluded.value,
3574               category = excluded.category,
3575               priority = excluded.priority,
3576               channel = excluded.channel,
3577               tags = excluded.tags,
3578               size = excluded.size,
3579               updated_at = excluded.updated_at",
3580            rusqlite::params![
3581                item.id,
3582                item.session_id,
3583                item.key,
3584                item.value,
3585                item.category,
3586                item.priority,
3587                item.channel,
3588                item.tags,
3589                item.size,
3590                item.created_at,
3591                item.updated_at,
3592            ],
3593        )?;
3594        Ok(())
3595    }
3596
3597    /// Upsert a memory item (for sync import).
3598    ///
3599    /// # Errors
3600    ///
3601    /// Returns an error if the upsert fails.
3602    pub fn upsert_memory(&mut self, memory: &Memory) -> Result<()> {
3603        self.conn.execute(
3604            "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
3605             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3606             ON CONFLICT(id) DO UPDATE SET
3607               key = excluded.key,
3608               value = excluded.value,
3609               category = excluded.category,
3610               updated_at = excluded.updated_at",
3611            rusqlite::params![
3612                memory.id,
3613                memory.project_path,
3614                memory.key,
3615                memory.value,
3616                memory.category,
3617                memory.created_at,
3618                memory.updated_at,
3619            ],
3620        )?;
3621        Ok(())
3622    }
3623
3624    /// Upsert a checkpoint (for sync import).
3625    ///
3626    /// Note: This does not import checkpoint items - those would need separate handling.
3627    ///
3628    /// # Errors
3629    ///
3630    /// Returns an error if the upsert fails.
3631    pub fn upsert_checkpoint(&mut self, checkpoint: &Checkpoint) -> Result<()> {
3632        self.conn.execute(
3633            "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
3634             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3635             ON CONFLICT(id) DO UPDATE SET
3636               name = excluded.name,
3637               description = excluded.description,
3638               git_status = excluded.git_status,
3639               git_branch = excluded.git_branch",
3640            rusqlite::params![
3641                checkpoint.id,
3642                checkpoint.session_id,
3643                checkpoint.name,
3644                checkpoint.description,
3645                checkpoint.git_status,
3646                checkpoint.git_branch,
3647                checkpoint.created_at,
3648            ],
3649        )?;
3650        Ok(())
3651    }
3652
3653    // ======================
3654    // Project Operations
3655    // ======================
3656
3657    /// Create a new project.
3658    ///
3659    /// # Errors
3660    ///
3661    /// Returns an error if the project already exists or the insert fails.
3662    pub fn create_project(&mut self, project: &Project, actor: &str) -> Result<()> {
3663        self.mutate("create_project", actor, |tx, ctx| {
3664            tx.execute(
3665                "INSERT INTO projects (id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at)
3666                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
3667                rusqlite::params![
3668                    project.id,
3669                    project.project_path,
3670                    project.name,
3671                    project.description,
3672                    project.issue_prefix,
3673                    project.next_issue_number,
3674                    project.plan_prefix,
3675                    project.next_plan_number,
3676                    project.created_at,
3677                    project.updated_at,
3678                ],
3679            )?;
3680
3681            ctx.record_event("project", &project.id, EventType::ProjectCreated);
3682            Ok(())
3683        })
3684    }
3685
3686    /// Get a project by ID.
3687    ///
3688    /// # Errors
3689    ///
3690    /// Returns an error if the query fails.
3691    pub fn get_project(&self, id: &str) -> Result<Option<Project>> {
3692        let project = self
3693            .conn
3694            .query_row(
3695                "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3696                 FROM projects WHERE id = ?1",
3697                [id],
3698                map_project_row,
3699            )
3700            .optional()?;
3701        Ok(project)
3702    }
3703
3704    /// Get a project by path.
3705    ///
3706    /// # Errors
3707    ///
3708    /// Returns an error if the query fails.
3709    pub fn get_project_by_path(&self, project_path: &str) -> Result<Option<Project>> {
3710        let project = self
3711            .conn
3712            .query_row(
3713                "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3714                 FROM projects WHERE project_path = ?1",
3715                [project_path],
3716                map_project_row,
3717            )
3718            .optional()?;
3719        Ok(project)
3720    }
3721
3722    /// List all projects.
3723    ///
3724    /// # Errors
3725    ///
3726    /// Returns an error if the query fails.
3727    pub fn list_projects(&self, limit: usize) -> Result<Vec<Project>> {
3728        let mut stmt = self.conn.prepare(
3729            "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3730             FROM projects
3731             ORDER BY updated_at DESC
3732             LIMIT ?1",
3733        )?;
3734
3735        let projects = stmt
3736            .query_map([limit], map_project_row)?
3737            .collect::<std::result::Result<Vec<_>, _>>()?;
3738
3739        Ok(projects)
3740    }
3741
3742    /// Update a project.
3743    ///
3744    /// # Errors
3745    ///
3746    /// Returns an error if the project doesn't exist or the update fails.
3747    pub fn update_project(
3748        &mut self,
3749        id: &str,
3750        name: Option<&str>,
3751        description: Option<&str>,
3752        issue_prefix: Option<&str>,
3753        actor: &str,
3754    ) -> Result<()> {
3755        self.mutate("update_project", actor, |tx, ctx| {
3756            let now = chrono::Utc::now().timestamp_millis();
3757
3758            // Build dynamic update query
3759            let mut updates = vec!["updated_at = ?1"];
3760            let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3761            let mut param_idx = 2;
3762
3763            if let Some(n) = name {
3764                updates.push(format!("name = ?{param_idx}").leak());
3765                params.push(Box::new(n.to_string()));
3766                param_idx += 1;
3767            }
3768
3769            if let Some(d) = description {
3770                updates.push(format!("description = ?{param_idx}").leak());
3771                params.push(Box::new(d.to_string()));
3772                param_idx += 1;
3773            }
3774
3775            if let Some(p) = issue_prefix {
3776                updates.push(format!("issue_prefix = ?{param_idx}").leak());
3777                params.push(Box::new(p.to_string()));
3778                param_idx += 1;
3779            }
3780
3781            // Add the WHERE clause parameter
3782            params.push(Box::new(id.to_string()));
3783
3784            let sql = format!(
3785                "UPDATE projects SET {} WHERE id = ?{}",
3786                updates.join(", "),
3787                param_idx
3788            );
3789
3790            let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3791            let affected = tx.execute(&sql, param_refs.as_slice())?;
3792
3793            if affected == 0 {
3794                return Err(Error::ProjectNotFound { id: id.to_string() });
3795            }
3796
3797            ctx.record_event("project", id, EventType::ProjectUpdated);
3798            Ok(())
3799        })
3800    }
3801
3802    /// Delete a project and all associated data.
3803    ///
3804    /// This cascades to delete:
3805    /// - All sessions (and their context items, checkpoints)
3806    /// - All issues
3807    /// - All plans
3808    /// - All project memory
3809    ///
3810    /// # Errors
3811    ///
3812    /// Returns an error if the project doesn't exist or deletion fails.
3813    pub fn delete_project(&mut self, id: &str, actor: &str) -> Result<()> {
3814        self.mutate("delete_project", actor, |tx, ctx| {
3815            // Get project path for cascading deletes
3816            let project_path: Option<String> = tx
3817                .query_row(
3818                    "SELECT project_path FROM projects WHERE id = ?1",
3819                    [id],
3820                    |row| row.get(0),
3821                )
3822                .optional()?;
3823
3824            let project_path = project_path.ok_or_else(|| Error::ProjectNotFound { id: id.to_string() })?;
3825
3826            // Delete sessions (cascades to context_items, checkpoints via FK)
3827            tx.execute(
3828                "DELETE FROM sessions WHERE project_path = ?1",
3829                [&project_path],
3830            )?;
3831
3832            // Delete issues
3833            tx.execute(
3834                "DELETE FROM issues WHERE project_path = ?1",
3835                [&project_path],
3836            )?;
3837
3838            // Delete plans
3839            tx.execute(
3840                "DELETE FROM plans WHERE project_path = ?1",
3841                [&project_path],
3842            )?;
3843
3844            // Delete project memory
3845            tx.execute(
3846                "DELETE FROM project_memory WHERE project_path = ?1",
3847                [&project_path],
3848            )?;
3849
3850            // Delete the project itself
3851            let affected = tx.execute("DELETE FROM projects WHERE id = ?1", [id])?;
3852
3853            if affected == 0 {
3854                return Err(Error::ProjectNotFound { id: id.to_string() });
3855            }
3856
3857            ctx.record_event("project", id, EventType::ProjectDeleted);
3858            Ok(())
3859        })
3860    }
3861
3862    /// Get or create a project for the given path.
3863    ///
3864    /// If a project already exists at the path, returns it.
3865    /// Otherwise, creates a new project with a name derived from the path.
3866    ///
3867    /// # Errors
3868    ///
3869    /// Returns an error if the database operation fails.
3870    pub fn get_or_create_project(&mut self, project_path: &str, actor: &str) -> Result<Project> {
3871        // Check if project exists
3872        if let Some(project) = self.get_project_by_path(project_path)? {
3873            return Ok(project);
3874        }
3875
3876        // Create new project with name from path
3877        let name = std::path::Path::new(project_path)
3878            .file_name()
3879            .and_then(|n| n.to_str())
3880            .unwrap_or("Unknown Project")
3881            .to_string();
3882
3883        let project = Project::new(project_path.to_string(), name);
3884        self.create_project(&project, actor)?;
3885        Ok(project)
3886    }
3887
3888    /// Increment and return the next issue number for a project.
3889    ///
3890    /// # Errors
3891    ///
3892    /// Returns an error if the project doesn't exist or the update fails.
3893    pub fn get_next_issue_number(&mut self, project_path: &str) -> Result<i32> {
3894        let project = self
3895            .get_project_by_path(project_path)?
3896            .ok_or_else(|| Error::ProjectNotFound { id: project_path.to_string() })?;
3897
3898        let next_num = project.next_issue_number;
3899
3900        // Increment the counter
3901        self.conn.execute(
3902            "UPDATE projects SET next_issue_number = next_issue_number + 1, updated_at = ?1 WHERE project_path = ?2",
3903            rusqlite::params![chrono::Utc::now().timestamp_millis(), project_path],
3904        )?;
3905
3906        Ok(next_num)
3907    }
3908
3909    // ======================
3910    // Plan Operations
3911    // ======================
3912
3913    /// Create a new plan.
3914    ///
3915    /// # Errors
3916    ///
3917    /// Returns an error if the plan already exists or the insert fails.
3918    pub fn create_plan(&mut self, plan: &Plan, actor: &str) -> Result<()> {
3919        self.mutate("create_plan", actor, |tx, ctx| {
3920            tx.execute(
3921                "INSERT INTO plans (id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, source_path, source_hash, created_at, updated_at)
3922                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
3923                rusqlite::params![
3924                    plan.id,
3925                    plan.short_id,
3926                    plan.project_id,
3927                    plan.project_path,
3928                    plan.title,
3929                    plan.content,
3930                    plan.status.as_str(),
3931                    plan.success_criteria,
3932                    plan.session_id,
3933                    plan.created_in_session,
3934                    plan.source_path,
3935                    plan.source_hash,
3936                    plan.created_at,
3937                    plan.updated_at,
3938                ],
3939            )?;
3940
3941            ctx.record_event("plan", &plan.id, EventType::PlanCreated);
3942            Ok(())
3943        })
3944    }
3945
3946    /// Get a plan by ID.
3947    ///
3948    /// # Errors
3949    ///
3950    /// Returns an error if the query fails.
3951    pub fn get_plan(&self, id: &str) -> Result<Option<Plan>> {
3952        let plan = self
3953            .conn
3954            .query_row(
3955                "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3956                 FROM plans WHERE id = ?1",
3957                [id],
3958                map_plan_row,
3959            )
3960            .optional()?;
3961        Ok(plan)
3962    }
3963
3964    /// List plans for a project.
3965    ///
3966    /// # Errors
3967    ///
3968    /// Returns an error if the query fails.
3969    pub fn list_plans(&self, project_path: &str, status: Option<&str>, limit: usize) -> Result<Vec<Plan>> {
3970        let sql = if let Some(status) = status {
3971            if status == "all" {
3972                "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3973                 FROM plans WHERE project_path = ?1
3974                 ORDER BY updated_at DESC
3975                 LIMIT ?2".to_string()
3976            } else {
3977                format!(
3978                    "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3979                     FROM plans WHERE project_path = ?1 AND status = '{}'
3980                     ORDER BY updated_at DESC
3981                     LIMIT ?2",
3982                    status
3983                )
3984            }
3985        } else {
3986            // Default: show active plans only
3987            "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3988             FROM plans WHERE project_path = ?1 AND status = 'active'
3989             ORDER BY updated_at DESC
3990             LIMIT ?2".to_string()
3991        };
3992
3993        let mut stmt = self.conn.prepare(&sql)?;
3994        let plans = stmt
3995            .query_map(rusqlite::params![project_path, limit], map_plan_row)?
3996            .collect::<std::result::Result<Vec<_>, _>>()?;
3997
3998        Ok(plans)
3999    }
4000
4001    /// Update a plan.
4002    ///
4003    /// # Errors
4004    ///
4005    /// Returns an error if the plan doesn't exist or the update fails.
4006    pub fn update_plan(
4007        &mut self,
4008        id: &str,
4009        title: Option<&str>,
4010        content: Option<&str>,
4011        status: Option<&str>,
4012        success_criteria: Option<&str>,
4013        actor: &str,
4014    ) -> Result<()> {
4015        self.mutate("update_plan", actor, |tx, ctx| {
4016            let now = chrono::Utc::now().timestamp_millis();
4017
4018            // Build dynamic update query
4019            let mut updates = vec!["updated_at = ?1"];
4020            let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
4021            let mut param_idx = 2;
4022
4023            if let Some(t) = title {
4024                updates.push(format!("title = ?{param_idx}").leak());
4025                params.push(Box::new(t.to_string()));
4026                param_idx += 1;
4027            }
4028
4029            if let Some(c) = content {
4030                updates.push(format!("content = ?{param_idx}").leak());
4031                params.push(Box::new(c.to_string()));
4032                param_idx += 1;
4033            }
4034
4035            if let Some(s) = status {
4036                updates.push(format!("status = ?{param_idx}").leak());
4037                params.push(Box::new(s.to_string()));
4038                param_idx += 1;
4039
4040                // If marking completed, set completed_at
4041                if s == "completed" {
4042                    updates.push(format!("completed_at = ?{param_idx}").leak());
4043                    params.push(Box::new(now));
4044                    param_idx += 1;
4045                }
4046            }
4047
4048            if let Some(sc) = success_criteria {
4049                updates.push(format!("success_criteria = ?{param_idx}").leak());
4050                params.push(Box::new(sc.to_string()));
4051                param_idx += 1;
4052            }
4053
4054            // Add the WHERE clause parameter
4055            params.push(Box::new(id.to_string()));
4056
4057            let sql = format!(
4058                "UPDATE plans SET {} WHERE id = ?{}",
4059                updates.join(", "),
4060                param_idx
4061            );
4062
4063            let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
4064            let affected = tx.execute(&sql, param_refs.as_slice())?;
4065
4066            if affected == 0 {
4067                return Err(Error::Other(format!("Plan not found: {id}")));
4068            }
4069
4070            let event_type = if status == Some("completed") {
4071                EventType::PlanCompleted
4072            } else {
4073                EventType::PlanUpdated
4074            };
4075            ctx.record_event("plan", id, event_type);
4076            Ok(())
4077        })
4078    }
4079
4080    /// Get all plans for a specific project (for JSONL sync export).
4081    ///
4082    /// # Errors
4083    ///
4084    /// Returns an error if the query fails.
4085    pub fn get_plans_by_project(&self, project_path: &str) -> Result<Vec<Plan>> {
4086        let mut stmt = self.conn.prepare(
4087            "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4088             FROM plans WHERE project_path = ?1 ORDER BY created_at ASC",
4089        )?;
4090        let rows = stmt.query_map([project_path], map_plan_row)?;
4091        let plans: Vec<Plan> = rows.collect::<std::result::Result<_, _>>()?;
4092        Ok(plans)
4093    }
4094
4095    /// Find a plan by source hash (for capture deduplication).
4096    ///
4097    /// # Errors
4098    ///
4099    /// Returns an error if the query fails.
4100    pub fn find_plan_by_source_hash(&self, source_hash: &str) -> Result<Option<Plan>> {
4101        let plan = self
4102            .conn
4103            .query_row(
4104                "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4105                 FROM plans WHERE source_hash = ?1 LIMIT 1",
4106                [source_hash],
4107                map_plan_row,
4108            )
4109            .optional()?;
4110        Ok(plan)
4111    }
4112
4113    /// Upsert a plan (for sync import).
4114    ///
4115    /// # Errors
4116    ///
4117    /// Returns an error if the upsert fails.
4118    pub fn upsert_plan(&mut self, plan: &Plan) -> Result<()> {
4119        self.conn.execute(
4120            "INSERT INTO plans (id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at)
4121             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
4122             ON CONFLICT(id) DO UPDATE SET
4123               short_id = excluded.short_id,
4124               title = excluded.title,
4125               content = excluded.content,
4126               status = excluded.status,
4127               success_criteria = excluded.success_criteria,
4128               session_id = excluded.session_id,
4129               source_path = excluded.source_path,
4130               source_hash = excluded.source_hash,
4131               updated_at = excluded.updated_at,
4132               completed_at = excluded.completed_at",
4133            rusqlite::params![
4134                plan.id,
4135                plan.short_id,
4136                plan.project_id,
4137                plan.project_path,
4138                plan.title,
4139                plan.content,
4140                plan.status.as_str(),
4141                plan.success_criteria,
4142                plan.session_id,
4143                plan.created_in_session,
4144                plan.completed_in_session,
4145                plan.source_path,
4146                plan.source_hash,
4147                plan.created_at,
4148                plan.updated_at,
4149                plan.completed_at,
4150            ],
4151        )?;
4152        Ok(())
4153    }
4154
4155    /// Get dirty plan IDs by project (for JSONL sync export).
4156    ///
4157    /// # Errors
4158    ///
4159    /// Returns an error if the query fails.
4160    pub fn get_dirty_plans_by_project(&self, project_path: &str) -> Result<Vec<String>> {
4161        let mut stmt = self.conn.prepare(
4162            "SELECT dp.plan_id
4163             FROM dirty_plans dp
4164             INNER JOIN plans p ON dp.plan_id = p.id
4165             WHERE p.project_path = ?1",
4166        )?;
4167        let rows = stmt.query_map([project_path], |row| row.get(0))?;
4168        Ok(rows.collect::<std::result::Result<_, _>>()?)
4169    }
4170
4171    /// Clear dirty flags for plans after successful export.
4172    ///
4173    /// # Errors
4174    ///
4175    /// Returns an error if the delete fails.
4176    pub fn clear_dirty_plans(&mut self, ids: &[String]) -> Result<()> {
4177        if ids.is_empty() {
4178            return Ok(());
4179        }
4180        let placeholders = vec!["?"; ids.len()].join(",");
4181        let sql = format!("DELETE FROM dirty_plans WHERE plan_id IN ({placeholders})");
4182        let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
4183        self.conn.execute(&sql, params.as_slice())?;
4184        Ok(())
4185    }
4186
4187    // ======================
4188    // Embedding Operations
4189    // ======================
4190
4191    /// Store an embedding chunk for a context item.
4192    ///
4193    /// Embeddings are stored as BLOBs (binary f32 arrays).
4194    /// Large items may have multiple chunks for full semantic coverage.
4195    ///
4196    /// # Errors
4197    ///
4198    /// Returns an error if the insert fails.
4199    pub fn store_embedding_chunk(
4200        &mut self,
4201        id: &str,
4202        item_id: &str,
4203        chunk_index: i32,
4204        chunk_text: &str,
4205        embedding: &[f32],
4206        provider: &str,
4207        model: &str,
4208    ) -> Result<()> {
4209        let now = chrono::Utc::now().timestamp_millis();
4210        let dimensions = embedding.len() as i32;
4211
4212        // Convert f32 slice to bytes (little-endian)
4213        let blob: Vec<u8> = embedding
4214            .iter()
4215            .flat_map(|f| f.to_le_bytes())
4216            .collect();
4217
4218        self.conn.execute(
4219            "INSERT INTO embedding_chunks (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
4220             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
4221             ON CONFLICT(item_id, chunk_index) DO UPDATE SET
4222               chunk_text = excluded.chunk_text,
4223               embedding = excluded.embedding,
4224               dimensions = excluded.dimensions,
4225               provider = excluded.provider,
4226               model = excluded.model,
4227               created_at = excluded.created_at",
4228            rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, provider, model, now],
4229        )?;
4230
4231        // Update context_items embedding metadata
4232        self.conn.execute(
4233            "UPDATE context_items SET
4234               embedding_status = 'complete',
4235               embedding_provider = ?1,
4236               embedding_model = ?2,
4237               chunk_count = COALESCE(
4238                 (SELECT MAX(chunk_index) + 1 FROM embedding_chunks WHERE item_id = ?3),
4239                 1
4240               ),
4241               embedded_at = ?4
4242             WHERE id = ?3",
4243            rusqlite::params![provider, model, item_id, now],
4244        )?;
4245
4246        Ok(())
4247    }
4248
4249    /// Get embedding chunks for a context item.
4250    ///
4251    /// # Errors
4252    ///
4253    /// Returns an error if the query fails.
4254    pub fn get_embedding_chunks(&self, item_id: &str) -> Result<Vec<EmbeddingChunk>> {
4255        let mut stmt = self.conn.prepare(
4256            "SELECT id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at
4257             FROM embedding_chunks
4258             WHERE item_id = ?1
4259             ORDER BY chunk_index ASC",
4260        )?;
4261
4262        let rows = stmt.query_map([item_id], |row| {
4263            let blob: Vec<u8> = row.get(4)?;
4264            let dimensions: i32 = row.get(5)?;
4265
4266            // Convert bytes back to f32 vec
4267            let embedding: Vec<f32> = blob
4268                .chunks_exact(4)
4269                .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4270                .collect();
4271
4272            Ok(EmbeddingChunk {
4273                id: row.get(0)?,
4274                item_id: row.get(1)?,
4275                chunk_index: row.get(2)?,
4276                chunk_text: row.get(3)?,
4277                embedding,
4278                dimensions: dimensions as usize,
4279                provider: row.get(6)?,
4280                model: row.get(7)?,
4281                created_at: row.get(8)?,
4282            })
4283        })?;
4284
4285        rows.collect::<std::result::Result<Vec<_>, _>>()
4286            .map_err(Error::from)
4287    }
4288
4289    /// Get context items without embeddings (for backfill).
4290    ///
4291    /// # Errors
4292    ///
4293    /// Returns an error if the query fails.
4294    pub fn get_items_without_embeddings(
4295        &self,
4296        session_id: Option<&str>,
4297        limit: Option<u32>,
4298    ) -> Result<Vec<ContextItem>> {
4299        let limit = limit.unwrap_or(100);
4300
4301        let sql = if let Some(sid) = session_id {
4302            format!(
4303                "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4304                 FROM context_items
4305                 WHERE session_id = '{}' AND (embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error'))
4306                 ORDER BY created_at DESC
4307                 LIMIT {}",
4308                sid, limit
4309            )
4310        } else {
4311            format!(
4312                "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4313                 FROM context_items
4314                 WHERE embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error')
4315                 ORDER BY created_at DESC
4316                 LIMIT {}",
4317                limit
4318            )
4319        };
4320
4321        let mut stmt = self.conn.prepare(&sql)?;
4322        let rows = stmt.query_map([], |row| {
4323            Ok(ContextItem {
4324                id: row.get(0)?,
4325                session_id: row.get(1)?,
4326                key: row.get(2)?,
4327                value: row.get(3)?,
4328                category: row.get(4)?,
4329                priority: row.get(5)?,
4330                channel: row.get(6)?,
4331                tags: row.get(7)?,
4332                size: row.get(8)?,
4333                created_at: row.get(9)?,
4334                updated_at: row.get(10)?,
4335            })
4336        })?;
4337
4338        rows.collect::<std::result::Result<Vec<_>, _>>()
4339            .map_err(Error::from)
4340    }
4341
4342    /// Count items with and without embeddings.
4343    ///
4344    /// # Errors
4345    ///
4346    /// Returns an error if the query fails.
4347    pub fn count_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
4348        let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
4349            let with: i64 = self.conn.query_row(
4350                "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND embedding_status = 'complete'",
4351                [sid],
4352                |row| row.get(0),
4353            )?;
4354            let without: i64 = self.conn.query_row(
4355                "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error'))",
4356                [sid],
4357                |row| row.get(0),
4358            )?;
4359            (with, without)
4360        } else {
4361            let with: i64 = self.conn.query_row(
4362                "SELECT COUNT(*) FROM context_items WHERE embedding_status = 'complete'",
4363                [],
4364                |row| row.get(0),
4365            )?;
4366            let without: i64 = self.conn.query_row(
4367                "SELECT COUNT(*) FROM context_items WHERE embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error')",
4368                [],
4369                |row| row.get(0),
4370            )?;
4371            (with, without)
4372        };
4373
4374        Ok(EmbeddingStats {
4375            with_embeddings: with_embeddings as usize,
4376            without_embeddings: without_embeddings as usize,
4377        })
4378    }
4379
4380    /// Resync embedding status for items claiming 'complete' but lacking actual data.
4381    ///
4382    /// Migration 011 dropped the old vec_context_chunks table and reset statuses to
4383    /// 'pending', but subsequent logic set them back to 'complete' without actual
4384    /// embedding data. This method detects and fixes that mismatch.
4385    ///
4386    /// Returns the number of items reset.
4387    ///
4388    /// # Errors
4389    ///
4390    /// Returns an error if the query fails.
4391    pub fn resync_embedding_status(&self) -> Result<usize> {
4392        let count = self.conn.execute(
4393            "UPDATE context_items SET embedding_status = 'pending'
4394             WHERE embedding_status = 'complete'
4395             AND id NOT IN (SELECT DISTINCT item_id FROM embedding_chunks)",
4396            [],
4397        )?;
4398        Ok(count)
4399    }
4400
4401    /// Perform semantic search using cosine similarity.
4402    ///
4403    /// This is a brute-force search that computes cosine similarity
4404    /// between the query embedding and all stored embeddings.
4405    /// Efficient for <50K items; use Hora for larger datasets.
4406    ///
4407    /// # Errors
4408    ///
4409    /// Returns an error if the query fails.
4410    pub fn semantic_search(
4411        &self,
4412        query_embedding: &[f32],
4413        session_id: Option<&str>,
4414        limit: usize,
4415        threshold: f32,
4416    ) -> Result<Vec<SemanticSearchResult>> {
4417        // Get all embedding chunks (optionally filtered by session)
4418        let sql = if let Some(sid) = session_id {
4419            format!(
4420                "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4421                        ci.key, ci.value, ci.category, ci.priority
4422                 FROM embedding_chunks ec
4423                 INNER JOIN context_items ci ON ec.item_id = ci.id
4424                 WHERE ci.session_id = '{}'",
4425                sid
4426            )
4427        } else {
4428            "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4429                    ci.key, ci.value, ci.category, ci.priority
4430             FROM embedding_chunks ec
4431             INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
4432        };
4433
4434        let mut stmt = self.conn.prepare(&sql)?;
4435        let rows = stmt.query_map([], |row| {
4436            let blob: Vec<u8> = row.get(4)?;
4437            let embedding: Vec<f32> = blob
4438                .chunks_exact(4)
4439                .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4440                .collect();
4441
4442            Ok((
4443                row.get::<_, String>(1)?, // item_id
4444                row.get::<_, i32>(2)?,    // chunk_index
4445                row.get::<_, String>(3)?, // chunk_text
4446                embedding,
4447                row.get::<_, String>(6)?, // key
4448                row.get::<_, String>(7)?, // value
4449                row.get::<_, String>(8)?, // category
4450                row.get::<_, String>(9)?, // priority
4451            ))
4452        })?;
4453
4454        // Compute similarities and collect results
4455        let mut results: Vec<SemanticSearchResult> = rows
4456            .filter_map(|row| row.ok())
4457            .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
4458                let similarity = cosine_similarity(query_embedding, &embedding);
4459                SemanticSearchResult {
4460                    item_id,
4461                    chunk_index,
4462                    chunk_text,
4463                    similarity,
4464                    key,
4465                    value,
4466                    category,
4467                    priority,
4468                }
4469            })
4470            .filter(|r| r.similarity >= threshold)
4471            .collect();
4472
4473        // Sort by similarity (highest first)
4474        results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
4475
4476        // Take top N results, deduplicating by item_id (keep highest similarity chunk)
4477        let mut seen_items = std::collections::HashSet::new();
4478        let deduped: Vec<SemanticSearchResult> = results
4479            .into_iter()
4480            .filter(|r| seen_items.insert(r.item_id.clone()))
4481            .take(limit)
4482            .collect();
4483
4484        Ok(deduped)
4485    }
4486
4487    /// Delete embeddings for a context item.
4488    ///
4489    /// # Errors
4490    ///
4491    /// Returns an error if the delete fails.
4492    pub fn delete_embeddings(&mut self, item_id: &str) -> Result<()> {
4493        self.conn.execute(
4494            "DELETE FROM embedding_chunks WHERE item_id = ?1",
4495            [item_id],
4496        )?;
4497
4498        self.conn.execute(
4499            "UPDATE context_items SET
4500               embedding_status = 'none',
4501               embedding_provider = NULL,
4502               embedding_model = NULL,
4503               chunk_count = 0,
4504               embedded_at = NULL
4505             WHERE id = ?1",
4506            [item_id],
4507        )?;
4508
4509        Ok(())
4510    }
4511
4512    /// Get embedding metadata (provider, model, dimensions).
4513    ///
4514    /// # Errors
4515    ///
4516    /// Returns an error if the query fails.
4517    pub fn get_embedding_meta(&self, key: &str) -> Result<Option<String>> {
4518        let value = self.conn.query_row(
4519            "SELECT value FROM embeddings_meta WHERE key = ?1",
4520            [key],
4521            |row| row.get(0),
4522        ).optional()?;
4523        Ok(value)
4524    }
4525
4526    /// Set embedding metadata.
4527    ///
4528    /// # Errors
4529    ///
4530    /// Returns an error if the upsert fails.
4531    pub fn set_embedding_meta(&mut self, key: &str, value: &str) -> Result<()> {
4532        let now = chrono::Utc::now().timestamp_millis();
4533        self.conn.execute(
4534            "INSERT INTO embeddings_meta (key, value, updated_at)
4535             VALUES (?1, ?2, ?3)
4536             ON CONFLICT(key) DO UPDATE SET
4537               value = excluded.value,
4538               updated_at = excluded.updated_at",
4539            rusqlite::params![key, value, now],
4540        )?;
4541        Ok(())
4542    }
4543
4544    // ========================================================================
4545    // Fast Tier Embeddings (2-tier architecture)
4546    // ========================================================================
4547
4548    /// Store a fast-tier embedding chunk (Model2Vec).
4549    ///
4550    /// Fast tier embeddings are stored separately for dimension isolation.
4551    /// These are generated inline on save for instant semantic search.
4552    ///
4553    /// # Errors
4554    ///
4555    /// Returns an error if the insert fails.
4556    pub fn store_fast_embedding_chunk(
4557        &mut self,
4558        id: &str,
4559        item_id: &str,
4560        chunk_index: i32,
4561        chunk_text: &str,
4562        embedding: &[f32],
4563        model: &str,
4564    ) -> Result<()> {
4565        let now = chrono::Utc::now().timestamp_millis();
4566        let dimensions = embedding.len() as i32;
4567
4568        // Convert f32 slice to bytes (little-endian)
4569        let blob: Vec<u8> = embedding
4570            .iter()
4571            .flat_map(|f| f.to_le_bytes())
4572            .collect();
4573
4574        self.conn.execute(
4575            "INSERT INTO embedding_chunks_fast (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
4576             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'model2vec', ?7, ?8)
4577             ON CONFLICT(item_id, chunk_index) DO UPDATE SET
4578               chunk_text = excluded.chunk_text,
4579               embedding = excluded.embedding,
4580               dimensions = excluded.dimensions,
4581               model = excluded.model,
4582               created_at = excluded.created_at",
4583            rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, model, now],
4584        )?;
4585
4586        // Update context_items fast embedding status
4587        self.conn.execute(
4588            "UPDATE context_items SET
4589               fast_embedding_status = 'complete',
4590               fast_embedded_at = ?1
4591             WHERE id = ?2",
4592            rusqlite::params![now, item_id],
4593        )?;
4594
4595        Ok(())
4596    }
4597
4598    /// Search fast-tier embeddings only.
4599    ///
4600    /// Returns candidates for tiered search or direct fast results.
4601    /// Fast tier is optimized for speed over accuracy.
4602    ///
4603    /// # Errors
4604    ///
4605    /// Returns an error if the query fails.
4606    pub fn search_fast_tier(
4607        &self,
4608        query_embedding: &[f32],
4609        session_id: Option<&str>,
4610        limit: usize,
4611        threshold: f32,
4612    ) -> Result<Vec<SemanticSearchResult>> {
4613        // Get all fast embedding chunks (optionally filtered by session)
4614        let sql = if let Some(sid) = session_id {
4615            format!(
4616                "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4617                        ci.key, ci.value, ci.category, ci.priority
4618                 FROM embedding_chunks_fast ec
4619                 INNER JOIN context_items ci ON ec.item_id = ci.id
4620                 WHERE ci.session_id = '{}'",
4621                sid
4622            )
4623        } else {
4624            "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4625                    ci.key, ci.value, ci.category, ci.priority
4626             FROM embedding_chunks_fast ec
4627             INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
4628        };
4629
4630        let mut stmt = self.conn.prepare(&sql)?;
4631        let rows = stmt.query_map([], |row| {
4632            let blob: Vec<u8> = row.get(4)?;
4633            let embedding: Vec<f32> = blob
4634                .chunks_exact(4)
4635                .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4636                .collect();
4637
4638            Ok((
4639                row.get::<_, String>(1)?, // item_id
4640                row.get::<_, i32>(2)?,    // chunk_index
4641                row.get::<_, String>(3)?, // chunk_text
4642                embedding,
4643                row.get::<_, String>(6)?, // key
4644                row.get::<_, String>(7)?, // value
4645                row.get::<_, String>(8)?, // category
4646                row.get::<_, String>(9)?, // priority
4647            ))
4648        })?;
4649
4650        // Compute similarities and collect results
4651        let mut results: Vec<SemanticSearchResult> = rows
4652            .filter_map(|row| row.ok())
4653            .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
4654                let similarity = cosine_similarity(query_embedding, &embedding);
4655                SemanticSearchResult {
4656                    item_id,
4657                    chunk_index,
4658                    chunk_text,
4659                    similarity,
4660                    key,
4661                    value,
4662                    category,
4663                    priority,
4664                }
4665            })
4666            .filter(|r| r.similarity >= threshold)
4667            .collect();
4668
4669        // Sort by similarity (highest first)
4670        results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
4671
4672        // Take top N results, deduplicating by item_id
4673        let mut seen_items = std::collections::HashSet::new();
4674        let deduped: Vec<SemanticSearchResult> = results
4675            .into_iter()
4676            .filter(|r| seen_items.insert(r.item_id.clone()))
4677            .take(limit)
4678            .collect();
4679
4680        Ok(deduped)
4681    }
4682
4683    /// Get context items with fast embeddings but no quality embeddings.
4684    ///
4685    /// Used by background quality upgrade process.
4686    ///
4687    /// # Errors
4688    ///
4689    /// Returns an error if the query fails.
4690    pub fn get_items_needing_quality_upgrade(
4691        &self,
4692        session_id: Option<&str>,
4693        limit: Option<u32>,
4694    ) -> Result<Vec<ContextItem>> {
4695        let limit = limit.unwrap_or(100);
4696
4697        let sql = if let Some(sid) = session_id {
4698            format!(
4699                "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4700                 FROM context_items
4701                 WHERE session_id = '{}'
4702                   AND fast_embedding_status = 'complete'
4703                   AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
4704                 ORDER BY created_at DESC
4705                 LIMIT {}",
4706                sid, limit
4707            )
4708        } else {
4709            format!(
4710                "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4711                 FROM context_items
4712                 WHERE fast_embedding_status = 'complete'
4713                   AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
4714                 ORDER BY created_at DESC
4715                 LIMIT {}",
4716                limit
4717            )
4718        };
4719
4720        let mut stmt = self.conn.prepare(&sql)?;
4721        let rows = stmt.query_map([], |row| {
4722            Ok(ContextItem {
4723                id: row.get(0)?,
4724                session_id: row.get(1)?,
4725                key: row.get(2)?,
4726                value: row.get(3)?,
4727                category: row.get(4)?,
4728                priority: row.get(5)?,
4729                channel: row.get(6)?,
4730                tags: row.get(7)?,
4731                size: row.get(8)?,
4732                created_at: row.get(9)?,
4733                updated_at: row.get(10)?,
4734            })
4735        })?;
4736
4737        rows.collect::<std::result::Result<Vec<_>, _>>()
4738            .map_err(Error::from)
4739    }
4740
4741    /// Delete fast-tier embeddings for a context item.
4742    ///
4743    /// # Errors
4744    ///
4745    /// Returns an error if the delete fails.
4746    pub fn delete_fast_embeddings(&mut self, item_id: &str) -> Result<()> {
4747        self.conn.execute(
4748            "DELETE FROM embedding_chunks_fast WHERE item_id = ?1",
4749            [item_id],
4750        )?;
4751
4752        self.conn.execute(
4753            "UPDATE context_items SET
4754               fast_embedding_status = 'none',
4755               fast_embedded_at = NULL
4756             WHERE id = ?1",
4757            [item_id],
4758        )?;
4759
4760        Ok(())
4761    }
4762
4763    /// Count fast embedding status.
4764    ///
4765    /// # Errors
4766    ///
4767    /// Returns an error if the query fails.
4768    pub fn count_fast_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
4769        let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
4770            let with: i64 = self.conn.query_row(
4771                "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND fast_embedding_status = 'complete'",
4772                [sid],
4773                |row| row.get(0),
4774            )?;
4775            let without: i64 = self.conn.query_row(
4776                "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (fast_embedding_status IS NULL OR fast_embedding_status = 'none')",
4777                [sid],
4778                |row| row.get(0),
4779            )?;
4780            (with, without)
4781        } else {
4782            let with: i64 = self.conn.query_row(
4783                "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status = 'complete'",
4784                [],
4785                |row| row.get(0),
4786            )?;
4787            let without: i64 = self.conn.query_row(
4788                "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status IS NULL OR fast_embedding_status = 'none'",
4789                [],
4790                |row| row.get(0),
4791            )?;
4792            (with, without)
4793        };
4794
4795        Ok(EmbeddingStats {
4796            with_embeddings: with_embeddings as usize,
4797            without_embeddings: without_embeddings as usize,
4798        })
4799    }
4800}
4801
4802// Helper to map plan rows
4803fn map_plan_row(row: &rusqlite::Row) -> rusqlite::Result<Plan> {
4804    let status_str: String = row.get(6)?;
4805    Ok(Plan {
4806        id: row.get(0)?,
4807        short_id: row.get(1)?,
4808        project_id: row.get(2)?,
4809        project_path: row.get(3)?,
4810        title: row.get(4)?,
4811        content: row.get(5)?,
4812        status: PlanStatus::from_str(&status_str),
4813        success_criteria: row.get(7)?,
4814        session_id: row.get(8)?,
4815        created_in_session: row.get(9)?,
4816        completed_in_session: row.get(10)?,
4817        source_path: row.get(11)?,
4818        source_hash: row.get(12)?,
4819        created_at: row.get(13)?,
4820        updated_at: row.get(14)?,
4821        completed_at: row.get(15)?,
4822    })
4823}
4824
4825// Helper to map project rows
4826fn map_project_row(row: &rusqlite::Row) -> rusqlite::Result<Project> {
4827    Ok(Project {
4828        id: row.get(0)?,
4829        project_path: row.get(1)?,
4830        name: row.get(2)?,
4831        description: row.get(3)?,
4832        issue_prefix: row.get(4)?,
4833        next_issue_number: row.get(5)?,
4834        plan_prefix: row.get(6)?,
4835        next_plan_number: row.get(7)?,
4836        created_at: row.get(8)?,
4837        updated_at: row.get(9)?,
4838    })
4839}
4840
4841// Helper to map issue rows
4842fn map_issue_row(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
4843    Ok(Issue {
4844        id: row.get(0)?,
4845        short_id: row.get(1)?,
4846        project_path: row.get(2)?,
4847        title: row.get(3)?,
4848        description: row.get(4)?,
4849        details: row.get(5)?,
4850        status: row.get(6)?,
4851        priority: row.get(7)?,
4852        issue_type: row.get(8)?,
4853        plan_id: row.get(9)?,
4854        created_by_agent: row.get(10)?,
4855        assigned_to_agent: row.get(11)?,
4856        created_at: row.get(12)?,
4857        updated_at: row.get(13)?,
4858        closed_at: row.get(14)?,
4859    })
4860}
4861
4862// ==================
4863// Data Structures
4864// ==================
4865
4866/// A session record.
4867#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4868pub struct Session {
4869    pub id: String,
4870    pub name: String,
4871    pub description: Option<String>,
4872    pub branch: Option<String>,
4873    pub channel: Option<String>,
4874    pub project_path: Option<String>,
4875    pub status: String,
4876    pub ended_at: Option<i64>,
4877    pub created_at: i64,
4878    pub updated_at: i64,
4879}
4880
4881/// A context item record.
4882#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4883pub struct ContextItem {
4884    pub id: String,
4885    pub session_id: String,
4886    pub key: String,
4887    pub value: String,
4888    pub category: String,
4889    pub priority: String,
4890    pub channel: Option<String>,
4891    pub tags: Option<String>,
4892    pub size: i64,
4893    pub created_at: i64,
4894    pub updated_at: i64,
4895}
4896
4897/// An issue record.
4898/// Note: Parent-child relationships are stored in issue_dependencies table.
4899#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4900pub struct Issue {
4901    pub id: String,
4902    pub short_id: Option<String>,
4903    pub project_path: String,
4904    pub title: String,
4905    pub description: Option<String>,
4906    pub details: Option<String>,
4907    pub status: String,
4908    pub priority: i32,
4909    pub issue_type: String,
4910    pub plan_id: Option<String>,
4911    pub created_by_agent: Option<String>,
4912    pub assigned_to_agent: Option<String>,
4913    pub created_at: i64,
4914    pub updated_at: i64,
4915    pub closed_at: Option<i64>,
4916}
4917
4918/// Progress tracking for an epic (child issue counts by status).
4919#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
4920pub struct EpicProgress {
4921    pub total: usize,
4922    pub closed: usize,
4923    pub in_progress: usize,
4924    pub open: usize,
4925    pub blocked: usize,
4926    pub deferred: usize,
4927}
4928
4929/// A checkpoint record.
4930#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4931pub struct Checkpoint {
4932    pub id: String,
4933    pub session_id: String,
4934    pub name: String,
4935    pub description: Option<String>,
4936    pub git_status: Option<String>,
4937    pub git_branch: Option<String>,
4938    pub created_at: i64,
4939    pub item_count: i64,
4940}
4941
4942/// A memory record (project-level persistent storage).
4943#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4944pub struct Memory {
4945    pub id: String,
4946    pub project_path: String,
4947    pub key: String,
4948    pub value: String,
4949    pub category: String,
4950    pub created_at: i64,
4951    pub updated_at: i64,
4952}
4953
4954/// A sync deletion record (tracks what was deleted for sync).
4955#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4956pub struct SyncDeletion {
4957    /// Internal database ID.
4958    pub id: i64,
4959    /// The type of entity that was deleted (session, issue, etc.).
4960    pub entity_type: String,
4961    /// The ID of the deleted entity.
4962    pub entity_id: String,
4963    /// The project path this deletion belongs to.
4964    pub project_path: String,
4965    /// Unix timestamp (milliseconds) when the deletion occurred.
4966    pub deleted_at: i64,
4967    /// Actor who performed the deletion.
4968    pub deleted_by: String,
4969}
4970
4971/// An embedding chunk record.
4972///
4973/// Embeddings are stored as BLOB (binary f32 arrays) for efficiency.
4974/// Large context items may be split into multiple chunks for better semantic coverage.
4975#[derive(Debug, Clone, serde::Serialize)]
4976pub struct EmbeddingChunk {
4977    /// Unique ID for this chunk.
4978    pub id: String,
4979    /// The context item this chunk belongs to.
4980    pub item_id: String,
4981    /// Chunk index (0 for single-chunk items).
4982    pub chunk_index: i32,
4983    /// The text that was embedded.
4984    pub chunk_text: String,
4985    /// The embedding vector (f32 values).
4986    pub embedding: Vec<f32>,
4987    /// Number of dimensions in the embedding.
4988    pub dimensions: usize,
4989    /// Provider that generated this embedding (e.g., "ollama").
4990    pub provider: String,
4991    /// Model used for embedding (e.g., "nomic-embed-text").
4992    pub model: String,
4993    /// Unix timestamp (milliseconds) when created.
4994    pub created_at: i64,
4995}
4996
4997/// Embedding statistics for a session or globally.
4998#[derive(Debug, Clone, serde::Serialize)]
4999pub struct EmbeddingStats {
5000    /// Number of items with embeddings.
5001    pub with_embeddings: usize,
5002    /// Number of items without embeddings.
5003    pub without_embeddings: usize,
5004}
5005
5006/// A semantic search result.
5007#[derive(Debug, Clone, serde::Serialize)]
5008pub struct SemanticSearchResult {
5009    /// The context item ID.
5010    pub item_id: String,
5011    /// Which chunk matched (0 for single-chunk items).
5012    pub chunk_index: i32,
5013    /// The text that was matched.
5014    pub chunk_text: String,
5015    /// Cosine similarity score (0.0 to 1.0).
5016    pub similarity: f32,
5017    /// Context item key.
5018    pub key: String,
5019    /// Context item value.
5020    pub value: String,
5021    /// Context item category.
5022    pub category: String,
5023    /// Context item priority.
5024    pub priority: String,
5025}
5026
5027/// Compute cosine similarity between two vectors.
5028///
5029/// Returns a value between -1.0 and 1.0, where:
5030/// - 1.0 means identical direction
5031/// - 0.0 means orthogonal (no similarity)
5032/// - -1.0 means opposite direction
5033///
5034/// For normalized embeddings (which most models produce), this is equivalent
5035/// to the dot product.
5036fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
5037    if a.len() != b.len() || a.is_empty() {
5038        return 0.0;
5039    }
5040
5041    let mut dot_product = 0.0;
5042    let mut norm_a = 0.0;
5043    let mut norm_b = 0.0;
5044
5045    for (x, y) in a.iter().zip(b.iter()) {
5046        dot_product += x * y;
5047        norm_a += x * x;
5048        norm_b += y * y;
5049    }
5050
5051    let magnitude = (norm_a * norm_b).sqrt();
5052    if magnitude == 0.0 {
5053        0.0
5054    } else {
5055        dot_product / magnitude
5056    }
5057}
5058
5059/// Generate a short ID (4 hex chars based on timestamp).
5060fn generate_short_id() -> String {
5061    use std::time::{SystemTime, UNIX_EPOCH};
5062    let now = SystemTime::now()
5063        .duration_since(UNIX_EPOCH)
5064        .unwrap()
5065        .as_millis();
5066    format!("{:04x}", (now & 0xFFFF) as u16)
5067}
5068
5069#[cfg(test)]
5070mod tests {
5071    use super::*;
5072
5073    #[test]
5074    fn test_open_memory() {
5075        let storage = SqliteStorage::open_memory();
5076        assert!(storage.is_ok());
5077    }
5078
5079    #[test]
5080    fn test_session_crud() {
5081        let mut storage = SqliteStorage::open_memory().unwrap();
5082
5083        // Create
5084        storage
5085            .create_session(
5086                "sess_1",
5087                "Test Session",
5088                Some("A test session"),
5089                Some("/test/project"),
5090                Some("main"),
5091                "test-actor",
5092            )
5093            .unwrap();
5094
5095        // Read
5096        let session = storage.get_session("sess_1").unwrap();
5097        assert!(session.is_some());
5098        let session = session.unwrap();
5099        assert_eq!(session.name, "Test Session");
5100        assert_eq!(session.status, "active");
5101
5102        // List
5103        let sessions = storage
5104            .list_sessions(Some("/test/project"), None, None)
5105            .unwrap();
5106        assert_eq!(sessions.len(), 1);
5107
5108        // Update status
5109        storage
5110            .update_session_status("sess_1", "completed", "test-actor")
5111            .unwrap();
5112        let session = storage.get_session("sess_1").unwrap().unwrap();
5113        assert_eq!(session.status, "completed");
5114        assert!(session.ended_at.is_some());
5115    }
5116
5117    #[test]
5118    fn test_context_item_crud() {
5119        let mut storage = SqliteStorage::open_memory().unwrap();
5120
5121        // Create session first
5122        storage
5123            .create_session("sess_1", "Test", None, None, None, "actor")
5124            .unwrap();
5125
5126        // Save item
5127        storage
5128            .save_context_item(
5129                "item_1",
5130                "sess_1",
5131                "test-key",
5132                "test value",
5133                Some("note"),
5134                Some("high"),
5135                "actor",
5136            )
5137            .unwrap();
5138
5139        // Get items
5140        let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5141        assert_eq!(items.len(), 1);
5142        assert_eq!(items[0].key, "test-key");
5143        assert_eq!(items[0].priority, "high");
5144
5145        // Update (upsert)
5146        storage
5147            .save_context_item(
5148                "item_1",
5149                "sess_1",
5150                "test-key",
5151                "updated value",
5152                Some("decision"),
5153                None,
5154                "actor",
5155            )
5156            .unwrap();
5157
5158        let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5159        assert_eq!(items.len(), 1);
5160        assert_eq!(items[0].value, "updated value");
5161
5162        // Delete
5163        storage
5164            .delete_context_item("sess_1", "test-key", "actor")
5165            .unwrap();
5166        let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5167        assert_eq!(items.len(), 0);
5168    }
5169
5170    #[test]
5171    fn test_issue_crud() {
5172        let mut storage = SqliteStorage::open_memory().unwrap();
5173
5174        // Create
5175        storage
5176            .create_issue(
5177                "issue_1",
5178                Some("TST-1"),
5179                "/test/project",
5180                "Test Issue",
5181                Some("Description"),
5182                None,         // details
5183                Some("task"), // issue_type
5184                Some(3),      // priority
5185                None,         // plan_id
5186                "actor",
5187            )
5188            .unwrap();
5189
5190        // Get by full ID
5191        let issue = storage.get_issue("issue_1", None).unwrap();
5192        assert!(issue.is_some());
5193        let issue = issue.unwrap();
5194        assert_eq!(issue.title, "Test Issue");
5195        assert_eq!(issue.priority, 3);
5196
5197        // Get by short ID
5198        let issue = storage
5199            .get_issue("TST-1", Some("/test/project"))
5200            .unwrap();
5201        assert!(issue.is_some());
5202
5203        // List
5204        let issues = storage
5205            .list_issues("/test/project", None, None, None)
5206            .unwrap();
5207        assert_eq!(issues.len(), 1);
5208
5209        // Claim
5210        storage.claim_issue("issue_1", "agent-1").unwrap();
5211        let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5212        assert_eq!(issue.assigned_to_agent, Some("agent-1".to_string()));
5213        assert_eq!(issue.status, "in_progress");
5214
5215        // Release
5216        storage.release_issue("issue_1", "agent-1").unwrap();
5217        let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5218        assert!(issue.assigned_to_agent.is_none());
5219        assert_eq!(issue.status, "open");
5220
5221        // Close
5222        storage
5223            .update_issue_status("issue_1", "closed", "actor")
5224            .unwrap();
5225        let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5226        assert_eq!(issue.status, "closed");
5227        assert!(issue.closed_at.is_some());
5228    }
5229
5230    // --- Embeddings storage tests ---
5231
5232    #[test]
5233    fn test_get_items_without_embeddings_includes_pending() {
5234        let mut storage = SqliteStorage::open_memory().unwrap();
5235        storage
5236            .create_session("sess_1", "Test", None, None, None, "actor")
5237            .unwrap();
5238
5239        // Create items with different embedding statuses
5240        for (id, key, status) in [
5241            ("item_1", "none-status", "none"),
5242            ("item_2", "pending-status", "pending"),
5243            ("item_3", "error-status", "error"),
5244            ("item_4", "complete-status", "complete"),
5245        ] {
5246            storage
5247                .save_context_item(id, "sess_1", key, "test value", Some("note"), Some("normal"), "actor")
5248                .unwrap();
5249            storage.conn.execute(
5250                "UPDATE context_items SET embedding_status = ?1 WHERE id = ?2",
5251                rusqlite::params![status, id],
5252            ).unwrap();
5253        }
5254
5255        // Also create one with NULL status (never processed)
5256        storage
5257            .save_context_item("item_5", "sess_1", "null-status", "test", Some("note"), Some("normal"), "actor")
5258            .unwrap();
5259        storage.conn.execute(
5260            "UPDATE context_items SET embedding_status = NULL WHERE id = 'item_5'",
5261            [],
5262        ).unwrap();
5263
5264        let items = storage.get_items_without_embeddings(None, None).unwrap();
5265        let keys: Vec<&str> = items.iter().map(|i| i.key.as_str()).collect();
5266
5267        // Should include: none, pending, error, NULL
5268        assert!(keys.contains(&"none-status"), "missing 'none' status");
5269        assert!(keys.contains(&"pending-status"), "missing 'pending' status");
5270        assert!(keys.contains(&"error-status"), "missing 'error' status");
5271        assert!(keys.contains(&"null-status"), "missing NULL status");
5272
5273        // Should NOT include: complete
5274        assert!(!keys.contains(&"complete-status"), "'complete' should be excluded");
5275        assert_eq!(items.len(), 4);
5276    }
5277
5278    #[test]
5279    fn test_get_items_without_embeddings_session_filter() {
5280        let mut storage = SqliteStorage::open_memory().unwrap();
5281        storage.create_session("sess_1", "Session 1", None, None, None, "actor").unwrap();
5282        storage.create_session("sess_2", "Session 2", None, None, None, "actor").unwrap();
5283
5284        storage.save_context_item("item_1", "sess_1", "s1-item", "val", Some("note"), Some("normal"), "actor").unwrap();
5285        storage.save_context_item("item_2", "sess_2", "s2-item", "val", Some("note"), Some("normal"), "actor").unwrap();
5286
5287        // Reset both to pending
5288        storage.conn.execute("UPDATE context_items SET embedding_status = 'pending'", []).unwrap();
5289
5290        // Filter by session
5291        let s1_items = storage.get_items_without_embeddings(Some("sess_1"), None).unwrap();
5292        assert_eq!(s1_items.len(), 1);
5293        assert_eq!(s1_items[0].key, "s1-item");
5294
5295        // No filter returns both
5296        let all_items = storage.get_items_without_embeddings(None, None).unwrap();
5297        assert_eq!(all_items.len(), 2);
5298    }
5299
5300    #[test]
5301    fn test_resync_embedding_status() {
5302        let mut storage = SqliteStorage::open_memory().unwrap();
5303        storage.create_session("sess_1", "Test", None, None, None, "actor").unwrap();
5304
5305        // Create items
5306        storage.save_context_item("item_1", "sess_1", "phantom", "val", Some("note"), Some("normal"), "actor").unwrap();
5307        storage.save_context_item("item_2", "sess_1", "real", "val", Some("note"), Some("normal"), "actor").unwrap();
5308        storage.save_context_item("item_3", "sess_1", "pending-already", "val", Some("note"), Some("normal"), "actor").unwrap();
5309
5310        // Mark all as complete
5311        storage.conn.execute("UPDATE context_items SET embedding_status = 'complete'", []).unwrap();
5312        // Mark item_3 as pending (shouldn't be touched)
5313        storage.conn.execute("UPDATE context_items SET embedding_status = 'pending' WHERE id = 'item_3'", []).unwrap();
5314
5315        // Add actual embedding data ONLY for item_2
5316        storage.conn.execute(
5317            "INSERT INTO embedding_chunks (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
5318             VALUES ('ec_1', 'item_2', 0, 'test', X'00000000', 1, 'test', 'test-model', 1000)",
5319            [],
5320        ).unwrap();
5321
5322        // Resync: item_1 claims complete but has no data -> should reset to pending
5323        let count = storage.resync_embedding_status().unwrap();
5324        assert_eq!(count, 1, "only item_1 should be reset (phantom complete)");
5325
5326        // Verify states
5327        let status_1: String = storage.conn.query_row(
5328            "SELECT embedding_status FROM context_items WHERE id = 'item_1'", [], |r| r.get(0)
5329        ).unwrap();
5330        assert_eq!(status_1, "pending", "phantom complete should be reset");
5331
5332        let status_2: String = storage.conn.query_row(
5333            "SELECT embedding_status FROM context_items WHERE id = 'item_2'", [], |r| r.get(0)
5334        ).unwrap();
5335        assert_eq!(status_2, "complete", "real complete should be untouched");
5336
5337        let status_3: String = storage.conn.query_row(
5338            "SELECT embedding_status FROM context_items WHERE id = 'item_3'", [], |r| r.get(0)
5339        ).unwrap();
5340        assert_eq!(status_3, "pending", "already-pending should be untouched");
5341    }
5342}