rusty_beads/storage/
sqlite.rs

1//! SQLite storage implementation.
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
6
7use chrono::{DateTime, Utc};
8use rusqlite::{params, Connection, OptionalExtension};
9
10use super::schema::{SCHEMA, SCHEMA_VERSION, set_schema_version};
11use super::{Result, Storage, StorageError};
12use crate::types::{
13    AgentState, BlockedIssue, Comment, Dependency, DependencyType, Event,
14    EventType, Issue, IssueFilter, IssueType, MolType, Status, Statistics,
15};
16
17/// SQLite-based storage implementation.
18pub struct SqliteStorage {
19    conn: Arc<Mutex<Connection>>,
20    closed: AtomicBool,
21}
22
23impl SqliteStorage {
24    /// Open or create a SQLite database at the given path.
25    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
26        let conn = Connection::open(path)?;
27        Self::initialize(conn)
28    }
29
30    /// Create an in-memory SQLite database.
31    pub fn in_memory() -> Result<Self> {
32        let conn = Connection::open_in_memory()?;
33        Self::initialize(conn)
34    }
35
36    fn initialize(conn: Connection) -> Result<Self> {
37        // Enable foreign keys and WAL mode
38        conn.execute_batch(
39            "PRAGMA foreign_keys = ON;
40             PRAGMA journal_mode = WAL;
41             PRAGMA busy_timeout = 5000;
42             PRAGMA synchronous = NORMAL;"
43        )?;
44
45        // Create schema
46        conn.execute_batch(SCHEMA)?;
47
48        // Set schema version
49        set_schema_version(&conn, SCHEMA_VERSION)?;
50
51        Ok(Self {
52            conn: Arc::new(Mutex::new(conn)),
53            closed: AtomicBool::new(false),
54        })
55    }
56
57    fn check_closed(&self) -> Result<()> {
58        if self.closed.load(Ordering::Acquire) {
59            return Err(StorageError::Closed);
60        }
61        Ok(())
62    }
63
64    fn with_conn<F, T>(&self, f: F) -> Result<T>
65    where
66        F: FnOnce(&Connection) -> Result<T>,
67    {
68        self.check_closed()?;
69        let conn = self.conn.lock().map_err(|e| StorageError::Other(e.to_string()))?;
70        f(&conn)
71    }
72
73    fn with_conn_mut<F, T>(&self, f: F) -> Result<T>
74    where
75        F: FnOnce(&mut Connection) -> Result<T>,
76    {
77        self.check_closed()?;
78        let mut conn = self.conn.lock().map_err(|e| StorageError::Other(e.to_string()))?;
79        f(&mut conn)
80    }
81
82    /// Record an event for an issue.
83    fn record_event(
84        conn: &Connection,
85        issue_id: &str,
86        event_type: EventType,
87        actor: &str,
88        old_value: Option<&str>,
89        new_value: Option<&str>,
90    ) -> Result<()> {
91        conn.execute(
92            "INSERT INTO events (issue_id, event_type, actor, old_value, new_value, created_at)
93             VALUES (?, ?, ?, ?, ?, ?)",
94            params![
95                issue_id,
96                event_type.as_str(),
97                actor,
98                old_value,
99                new_value,
100                Utc::now().to_rfc3339(),
101            ],
102        )?;
103        Ok(())
104    }
105
106    /// Mark an issue as dirty internally.
107    fn mark_dirty_internal(conn: &Connection, issue_id: &str) -> Result<()> {
108        conn.execute(
109            "INSERT OR REPLACE INTO dirty_issues (issue_id, marked_at) VALUES (?, ?)",
110            params![issue_id, Utc::now().to_rfc3339()],
111        )?;
112        Ok(())
113    }
114}
115
116impl Storage for SqliteStorage {
117    fn create_issue(&self, issue: &Issue) -> Result<()> {
118        self.with_conn(|conn| {
119            // Check if issue already exists
120            let exists: bool = conn.query_row(
121                "SELECT 1 FROM issues WHERE id = ?",
122                [&issue.id],
123                |_| Ok(true),
124            ).optional()?.unwrap_or(false);
125
126            if exists {
127                return Err(StorageError::AlreadyExists(issue.id.clone()));
128            }
129
130            conn.execute(
131                "INSERT INTO issues (
132                    id, content_hash, title, description, design, acceptance_criteria, notes,
133                    status, priority, issue_type, assignee, owner, estimated_minutes,
134                    created_at, created_by, updated_at, closed_at, close_reason,
135                    due_at, defer_until, external_ref, source_system,
136                    deleted_at, deleted_by, delete_reason,
137                    compaction_level, compacted_at, compacted_at_commit, original_size,
138                    agent_state, mol_type, hook_bead, role_bead, rig, last_activity,
139                    pinned, is_template, ephemeral
140                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
141                params![
142                    issue.id,
143                    issue.content_hash,
144                    issue.title,
145                    issue.description,
146                    issue.design,
147                    issue.acceptance_criteria,
148                    issue.notes,
149                    issue.status.as_str(),
150                    issue.priority,
151                    issue.issue_type.as_str(),
152                    issue.assignee,
153                    issue.owner,
154                    issue.estimated_minutes,
155                    issue.created_at.to_rfc3339(),
156                    issue.created_by,
157                    issue.updated_at.to_rfc3339(),
158                    issue.closed_at.map(|t| t.to_rfc3339()),
159                    issue.close_reason,
160                    issue.due_at.map(|t| t.to_rfc3339()),
161                    issue.defer_until.map(|t| t.to_rfc3339()),
162                    issue.external_ref,
163                    issue.source_system,
164                    issue.deleted_at.map(|t| t.to_rfc3339()),
165                    issue.deleted_by,
166                    issue.delete_reason,
167                    issue.compaction_level,
168                    issue.compacted_at.map(|t| t.to_rfc3339()),
169                    issue.compacted_at_commit,
170                    issue.original_size,
171                    issue.agent_state.map(|s| s.as_str()),
172                    issue.mol_type.map(|m| m.as_str()),
173                    issue.hook_bead,
174                    issue.role_bead,
175                    issue.rig,
176                    issue.last_activity.map(|t| t.to_rfc3339()),
177                    issue.pinned,
178                    issue.is_template,
179                    issue.ephemeral,
180                ],
181            )?;
182
183            // Add labels
184            for label in &issue.labels {
185                conn.execute(
186                    "INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
187                    params![issue.id, label],
188                )?;
189            }
190
191            // Record event
192            Self::record_event(conn, &issue.id, EventType::Created, &issue.created_by, None, None)?;
193
194            // Mark dirty
195            Self::mark_dirty_internal(conn, &issue.id)?;
196
197            Ok(())
198        })
199    }
200
201    fn create_issues(&self, issues: &[Issue]) -> Result<()> {
202        self.with_conn_mut(|conn| {
203            let tx = conn.transaction()?;
204            for issue in issues {
205                // Insert issue (simplified for batch)
206                tx.execute(
207                    "INSERT INTO issues (
208                        id, content_hash, title, description, design, acceptance_criteria, notes,
209                        status, priority, issue_type, assignee, owner, estimated_minutes,
210                        created_at, created_by, updated_at, closed_at, close_reason,
211                        due_at, defer_until, external_ref, source_system,
212                        deleted_at, deleted_by, delete_reason,
213                        compaction_level, compacted_at, compacted_at_commit, original_size,
214                        agent_state, mol_type, hook_bead, role_bead, rig, last_activity,
215                        pinned, is_template, ephemeral
216                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
217                    params![
218                        issue.id,
219                        issue.content_hash,
220                        issue.title,
221                        issue.description,
222                        issue.design,
223                        issue.acceptance_criteria,
224                        issue.notes,
225                        issue.status.as_str(),
226                        issue.priority,
227                        issue.issue_type.as_str(),
228                        issue.assignee,
229                        issue.owner,
230                        issue.estimated_minutes,
231                        issue.created_at.to_rfc3339(),
232                        issue.created_by,
233                        issue.updated_at.to_rfc3339(),
234                        issue.closed_at.map(|t| t.to_rfc3339()),
235                        issue.close_reason,
236                        issue.due_at.map(|t| t.to_rfc3339()),
237                        issue.defer_until.map(|t| t.to_rfc3339()),
238                        issue.external_ref,
239                        issue.source_system,
240                        issue.deleted_at.map(|t| t.to_rfc3339()),
241                        issue.deleted_by,
242                        issue.delete_reason,
243                        issue.compaction_level,
244                        issue.compacted_at.map(|t| t.to_rfc3339()),
245                        issue.compacted_at_commit,
246                        issue.original_size,
247                        issue.agent_state.map(|s| s.as_str()),
248                        issue.mol_type.map(|m| m.as_str()),
249                        issue.hook_bead,
250                        issue.role_bead,
251                        issue.rig,
252                        issue.last_activity.map(|t| t.to_rfc3339()),
253                        issue.pinned,
254                        issue.is_template,
255                        issue.ephemeral,
256                    ],
257                )?;
258
259                // Add labels
260                for label in &issue.labels {
261                    tx.execute(
262                        "INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
263                        params![issue.id, label],
264                    )?;
265                }
266
267                // Mark dirty
268                tx.execute(
269                    "INSERT OR REPLACE INTO dirty_issues (issue_id, marked_at) VALUES (?, ?)",
270                    params![issue.id, Utc::now().to_rfc3339()],
271                )?;
272            }
273            tx.commit()?;
274            Ok(())
275        })
276    }
277
278    fn get_issue(&self, id: &str) -> Result<Option<Issue>> {
279        self.with_conn(|conn| {
280            let issue = conn.query_row(
281                "SELECT * FROM issues WHERE id = ?",
282                [id],
283                |row| row_to_issue(row),
284            ).optional()?;
285
286            if let Some(mut issue) = issue {
287                // Load labels
288                let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
289                let labels: Vec<String> = stmt.query_map([id], |row| row.get(0))?
290                    .filter_map(|r| r.ok())
291                    .collect();
292                issue.labels = labels;
293                Ok(Some(issue))
294            } else {
295                Ok(None)
296            }
297        })
298    }
299
300    fn get_issue_by_external_ref(&self, external_ref: &str) -> Result<Option<Issue>> {
301        self.with_conn(|conn| {
302            let issue = conn.query_row(
303                "SELECT * FROM issues WHERE external_ref = ?",
304                [external_ref],
305                |row| row_to_issue(row),
306            ).optional()?;
307
308            if let Some(mut issue) = issue {
309                // Load labels
310                let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
311                let labels: Vec<String> = stmt.query_map([&issue.id], |row| row.get(0))?
312                    .filter_map(|r| r.ok())
313                    .collect();
314                issue.labels = labels;
315                Ok(Some(issue))
316            } else {
317                Ok(None)
318            }
319        })
320    }
321
322    fn update_issue(&self, issue: &Issue) -> Result<()> {
323        self.with_conn(|conn| {
324            let rows = conn.execute(
325                "UPDATE issues SET
326                    content_hash = ?, title = ?, description = ?, design = ?,
327                    acceptance_criteria = ?, notes = ?, status = ?, priority = ?,
328                    issue_type = ?, assignee = ?, owner = ?, estimated_minutes = ?,
329                    updated_at = ?, closed_at = ?, close_reason = ?, due_at = ?,
330                    defer_until = ?, external_ref = ?, source_system = ?,
331                    deleted_at = ?, deleted_by = ?, delete_reason = ?,
332                    compaction_level = ?, compacted_at = ?, compacted_at_commit = ?,
333                    original_size = ?, agent_state = ?, mol_type = ?,
334                    hook_bead = ?, role_bead = ?, rig = ?, last_activity = ?,
335                    pinned = ?, is_template = ?, ephemeral = ?
336                WHERE id = ?",
337                params![
338                    issue.content_hash,
339                    issue.title,
340                    issue.description,
341                    issue.design,
342                    issue.acceptance_criteria,
343                    issue.notes,
344                    issue.status.as_str(),
345                    issue.priority,
346                    issue.issue_type.as_str(),
347                    issue.assignee,
348                    issue.owner,
349                    issue.estimated_minutes,
350                    issue.updated_at.to_rfc3339(),
351                    issue.closed_at.map(|t| t.to_rfc3339()),
352                    issue.close_reason,
353                    issue.due_at.map(|t| t.to_rfc3339()),
354                    issue.defer_until.map(|t| t.to_rfc3339()),
355                    issue.external_ref,
356                    issue.source_system,
357                    issue.deleted_at.map(|t| t.to_rfc3339()),
358                    issue.deleted_by,
359                    issue.delete_reason,
360                    issue.compaction_level,
361                    issue.compacted_at.map(|t| t.to_rfc3339()),
362                    issue.compacted_at_commit,
363                    issue.original_size,
364                    issue.agent_state.map(|s| s.as_str()),
365                    issue.mol_type.map(|m| m.as_str()),
366                    issue.hook_bead,
367                    issue.role_bead,
368                    issue.rig,
369                    issue.last_activity.map(|t| t.to_rfc3339()),
370                    issue.pinned,
371                    issue.is_template,
372                    issue.ephemeral,
373                    issue.id,
374                ],
375            )?;
376
377            if rows == 0 {
378                return Err(StorageError::NotFound(issue.id.clone()));
379            }
380
381            // Update labels
382            conn.execute("DELETE FROM labels WHERE issue_id = ?", [&issue.id])?;
383            for label in &issue.labels {
384                conn.execute(
385                    "INSERT INTO labels (issue_id, label) VALUES (?, ?)",
386                    params![issue.id, label],
387                )?;
388            }
389
390            // Record event
391            Self::record_event(conn, &issue.id, EventType::Updated, &issue.created_by, None, None)?;
392
393            // Mark dirty
394            Self::mark_dirty_internal(conn, &issue.id)?;
395
396            Ok(())
397        })
398    }
399
400    fn close_issue(&self, id: &str, actor: &str, reason: Option<&str>) -> Result<()> {
401        self.with_conn(|conn| {
402            let now = Utc::now().to_rfc3339();
403            let rows = conn.execute(
404                "UPDATE issues SET status = 'closed', closed_at = ?, close_reason = ?, updated_at = ? WHERE id = ?",
405                params![now, reason, now, id],
406            )?;
407
408            if rows == 0 {
409                return Err(StorageError::NotFound(id.to_string()));
410            }
411
412            Self::record_event(conn, id, EventType::Closed, actor, None, reason)?;
413            Self::mark_dirty_internal(conn, id)?;
414
415            Ok(())
416        })
417    }
418
419    fn delete_issue(&self, id: &str, actor: &str, reason: Option<&str>) -> Result<()> {
420        self.with_conn(|conn| {
421            let now = Utc::now().to_rfc3339();
422            let rows = conn.execute(
423                "UPDATE issues SET status = 'tombstone', deleted_at = ?, deleted_by = ?, delete_reason = ?, updated_at = ? WHERE id = ?",
424                params![now, actor, reason, now, id],
425            )?;
426
427            if rows == 0 {
428                return Err(StorageError::NotFound(id.to_string()));
429            }
430
431            Self::record_event(conn, id, EventType::StatusChanged, actor, Some("open"), Some("tombstone"))?;
432            Self::mark_dirty_internal(conn, id)?;
433
434            Ok(())
435        })
436    }
437
438    fn search_issues(&self, filter: &IssueFilter) -> Result<Vec<Issue>> {
439        self.with_conn(|conn| {
440            let mut sql = String::from("SELECT * FROM issues WHERE 1=1");
441            let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
442
443            // Status filter
444            if let Some(ref status) = filter.status {
445                sql.push_str(" AND status = ?");
446                params.push(Box::new(status.as_str().to_string()));
447            }
448            if let Some(ref statuses) = filter.statuses {
449                let placeholders: Vec<_> = statuses.iter().map(|_| "?").collect();
450                sql.push_str(&format!(" AND status IN ({})", placeholders.join(",")));
451                for s in statuses {
452                    params.push(Box::new(s.as_str().to_string()));
453                }
454            }
455
456            // Exclude tombstones unless explicitly included
457            if !filter.include_tombstones {
458                sql.push_str(" AND status != 'tombstone'");
459            }
460
461            // Type filter
462            if let Some(ref issue_type) = filter.issue_type {
463                sql.push_str(" AND issue_type = ?");
464                params.push(Box::new(issue_type.as_str().to_string()));
465            }
466
467            // Assignee filter
468            if let Some(ref assignee) = filter.assignee {
469                sql.push_str(" AND assignee = ?");
470                params.push(Box::new(assignee.clone()));
471            }
472
473            // Priority filter
474            if let Some(priority) = filter.priority {
475                sql.push_str(" AND priority = ?");
476                params.push(Box::new(priority));
477            }
478
479            // Text search
480            if let Some(ref text) = filter.text_search {
481                sql.push_str(" AND (title LIKE ? OR description LIKE ? OR notes LIKE ?)");
482                let pattern = format!("%{}%", text);
483                params.push(Box::new(pattern.clone()));
484                params.push(Box::new(pattern.clone()));
485                params.push(Box::new(pattern));
486            }
487
488            // Sorting
489            sql.push_str(" ORDER BY ");
490            match filter.sort_by {
491                Some(crate::types::filter::SortField::Priority) => sql.push_str("priority"),
492                Some(crate::types::filter::SortField::Title) => sql.push_str("title"),
493                Some(crate::types::filter::SortField::UpdatedAt) => sql.push_str("updated_at"),
494                _ => sql.push_str("created_at"),
495            }
496            if filter.sort_desc {
497                sql.push_str(" DESC");
498            }
499
500            // Pagination
501            if let Some(limit) = filter.limit {
502                sql.push_str(&format!(" LIMIT {}", limit));
503            }
504            if let Some(offset) = filter.offset {
505                sql.push_str(&format!(" OFFSET {}", offset));
506            }
507
508            let mut stmt = conn.prepare(&sql)?;
509            let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
510
511            let issues: Vec<Issue> = stmt.query_map(params_refs.as_slice(), |row| row_to_issue(row))?
512                .filter_map(|r| r.ok())
513                .collect();
514
515            Ok(issues)
516        })
517    }
518
519    fn add_dependency(&self, dep: &Dependency) -> Result<()> {
520        // Check for cycles first
521        if dep.dep_type.check_cycles() && self.would_create_cycle(&dep.issue_id, &dep.depends_on_id, dep.dep_type)? {
522            return Err(StorageError::CycleDetected {
523                from: dep.issue_id.clone(),
524                to: dep.depends_on_id.clone(),
525            });
526        }
527
528        self.with_conn(|conn| {
529            conn.execute(
530                "INSERT OR REPLACE INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
531                 VALUES (?, ?, ?, ?, ?, ?, ?)",
532                params![
533                    dep.issue_id,
534                    dep.depends_on_id,
535                    dep.dep_type.as_str(),
536                    dep.created_at.to_rfc3339(),
537                    dep.created_by,
538                    dep.metadata,
539                    dep.thread_id,
540                ],
541            )?;
542
543            Self::record_event(
544                conn,
545                &dep.issue_id,
546                EventType::DependencyAdded,
547                dep.created_by.as_deref().unwrap_or("system"),
548                None,
549                Some(&dep.depends_on_id),
550            )?;
551
552            Self::mark_dirty_internal(conn, &dep.issue_id)?;
553
554            Ok(())
555        })
556    }
557
558    fn remove_dependency(&self, issue_id: &str, depends_on_id: &str) -> Result<()> {
559        self.with_conn(|conn| {
560            let rows = conn.execute(
561                "DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?",
562                params![issue_id, depends_on_id],
563            )?;
564
565            if rows > 0 {
566                Self::record_event(conn, issue_id, EventType::DependencyRemoved, "system", Some(depends_on_id), None)?;
567                Self::mark_dirty_internal(conn, issue_id)?;
568            }
569
570            Ok(())
571        })
572    }
573
574    fn get_dependencies(&self, issue_id: &str) -> Result<Vec<Dependency>> {
575        self.with_conn(|conn| {
576            let mut stmt = conn.prepare(
577                "SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
578                 FROM dependencies WHERE issue_id = ?"
579            )?;
580
581            let deps = stmt.query_map([issue_id], |row| row_to_dependency(row))?
582                .filter_map(|r| r.ok())
583                .collect();
584
585            Ok(deps)
586        })
587    }
588
589    fn get_dependents(&self, issue_id: &str) -> Result<Vec<Dependency>> {
590        self.with_conn(|conn| {
591            let mut stmt = conn.prepare(
592                "SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
593                 FROM dependencies WHERE depends_on_id = ?"
594            )?;
595
596            let deps = stmt.query_map([issue_id], |row| row_to_dependency(row))?
597                .filter_map(|r| r.ok())
598                .collect();
599
600            Ok(deps)
601        })
602    }
603
604    fn would_create_cycle(&self, from_id: &str, to_id: &str, dep_type: DependencyType) -> Result<bool> {
605        if !dep_type.check_cycles() {
606            return Ok(false);
607        }
608
609        self.with_conn(|conn| {
610            // Use recursive CTE to detect cycles
611            let sql = r#"
612                WITH RECURSIVE reachable(id, depth) AS (
613                    SELECT ?, 0
614                    UNION
615                    SELECT d.depends_on_id, r.depth + 1
616                    FROM reachable r
617                    JOIN dependencies d ON d.issue_id = r.id
618                    WHERE r.depth < 100
619                      AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
620                )
621                SELECT 1 FROM reachable WHERE id = ? LIMIT 1
622            "#;
623
624            let exists: bool = conn.query_row(sql, params![to_id, from_id], |_| Ok(true))
625                .optional()?
626                .unwrap_or(false);
627
628            Ok(exists)
629        })
630    }
631
632    fn get_ready_work(&self) -> Result<Vec<Issue>> {
633        self.with_conn(|conn| {
634            let sql = r#"
635                SELECT i.* FROM issues i
636                WHERE i.status = 'open'
637                  AND i.deleted_at IS NULL
638                  AND NOT EXISTS (
639                    SELECT 1 FROM dependencies d
640                    JOIN issues blocker ON blocker.id = d.depends_on_id
641                    WHERE d.issue_id = i.id
642                      AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
643                      AND blocker.status NOT IN ('closed', 'tombstone')
644                  )
645                ORDER BY i.priority, i.created_at
646            "#;
647
648            let mut stmt = conn.prepare(sql)?;
649            let issues = stmt.query_map([], |row| row_to_issue(row))?
650                .filter_map(|r| r.ok())
651                .collect();
652
653            Ok(issues)
654        })
655    }
656
657    fn get_blocked_issues(&self) -> Result<Vec<BlockedIssue>> {
658        self.with_conn(|conn| {
659            let sql = r#"
660                SELECT i.*, COUNT(d.depends_on_id) as blocking_count,
661                       GROUP_CONCAT(d.depends_on_id) as blocking_ids
662                FROM issues i
663                JOIN dependencies d ON d.issue_id = i.id
664                JOIN issues blocker ON blocker.id = d.depends_on_id
665                WHERE i.status IN ('open', 'blocked')
666                  AND i.deleted_at IS NULL
667                  AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
668                  AND blocker.status NOT IN ('closed', 'tombstone')
669                GROUP BY i.id
670                ORDER BY blocking_count DESC, i.priority, i.created_at
671            "#;
672
673            let mut stmt = conn.prepare(sql)?;
674            let blocked = stmt.query_map([], |row| {
675                let issue = row_to_issue(row)?;
676                let blocking_count: usize = row.get("blocking_count")?;
677                let blocking_ids_str: String = row.get("blocking_ids")?;
678                let blocking_ids: Vec<String> = blocking_ids_str
679                    .split(',')
680                    .map(|s| s.to_string())
681                    .collect();
682
683                Ok(BlockedIssue {
684                    issue,
685                    blocking_count,
686                    blocking_ids,
687                })
688            })?
689            .filter_map(|r| r.ok())
690            .collect();
691
692            Ok(blocked)
693        })
694    }
695
696    fn is_blocked(&self, issue_id: &str) -> Result<bool> {
697        self.with_conn(|conn| {
698            let sql = r#"
699                SELECT 1 FROM dependencies d
700                JOIN issues blocker ON blocker.id = d.depends_on_id
701                WHERE d.issue_id = ?
702                  AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
703                  AND blocker.status NOT IN ('closed', 'tombstone')
704                LIMIT 1
705            "#;
706
707            let blocked: bool = conn.query_row(sql, [issue_id], |_| Ok(true))
708                .optional()?
709                .unwrap_or(false);
710
711            Ok(blocked)
712        })
713    }
714
715    fn add_label(&self, issue_id: &str, label: &str) -> Result<()> {
716        self.with_conn(|conn| {
717            conn.execute(
718                "INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
719                params![issue_id, label],
720            )?;
721
722            Self::record_event(conn, issue_id, EventType::LabelAdded, "system", None, Some(label))?;
723            Self::mark_dirty_internal(conn, issue_id)?;
724
725            Ok(())
726        })
727    }
728
729    fn remove_label(&self, issue_id: &str, label: &str) -> Result<()> {
730        self.with_conn(|conn| {
731            let rows = conn.execute(
732                "DELETE FROM labels WHERE issue_id = ? AND label = ?",
733                params![issue_id, label],
734            )?;
735
736            if rows > 0 {
737                Self::record_event(conn, issue_id, EventType::LabelRemoved, "system", Some(label), None)?;
738                Self::mark_dirty_internal(conn, issue_id)?;
739            }
740
741            Ok(())
742        })
743    }
744
745    fn get_labels(&self, issue_id: &str) -> Result<Vec<String>> {
746        self.with_conn(|conn| {
747            let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
748            let labels = stmt.query_map([issue_id], |row| row.get(0))?
749                .filter_map(|r| r.ok())
750                .collect();
751            Ok(labels)
752        })
753    }
754
755    fn get_issues_by_label(&self, label: &str) -> Result<Vec<Issue>> {
756        self.with_conn(|conn| {
757            let sql = r#"
758                SELECT i.* FROM issues i
759                JOIN labels l ON l.issue_id = i.id
760                WHERE l.label = ?
761                  AND i.status != 'tombstone'
762                ORDER BY i.created_at DESC
763            "#;
764
765            let mut stmt = conn.prepare(sql)?;
766            let issues = stmt.query_map([label], |row| row_to_issue(row))?
767                .filter_map(|r| r.ok())
768                .collect();
769
770            Ok(issues)
771        })
772    }
773
774    fn add_comment(&self, issue_id: &str, author: &str, text: &str) -> Result<i64> {
775        self.with_conn(|conn| {
776            conn.execute(
777                "INSERT INTO comments (issue_id, author, text, created_at) VALUES (?, ?, ?, ?)",
778                params![issue_id, author, text, Utc::now().to_rfc3339()],
779            )?;
780
781            let id = conn.last_insert_rowid();
782
783            Self::record_event(conn, issue_id, EventType::Commented, author, None, None)?;
784            Self::mark_dirty_internal(conn, issue_id)?;
785
786            Ok(id)
787        })
788    }
789
790    fn get_comments(&self, issue_id: &str) -> Result<Vec<Comment>> {
791        self.with_conn(|conn| {
792            let mut stmt = conn.prepare(
793                "SELECT id, issue_id, author, text, created_at FROM comments WHERE issue_id = ? ORDER BY created_at"
794            )?;
795
796            let comments = stmt.query_map([issue_id], |row| {
797                Ok(Comment {
798                    id: row.get(0)?,
799                    issue_id: row.get(1)?,
800                    author: row.get(2)?,
801                    text: row.get(3)?,
802                    created_at: parse_datetime(&row.get::<_, String>(4)?),
803                })
804            })?
805            .filter_map(|r| r.ok())
806            .collect();
807
808            Ok(comments)
809        })
810    }
811
812    fn get_events(&self, issue_id: &str) -> Result<Vec<Event>> {
813        self.with_conn(|conn| {
814            let mut stmt = conn.prepare(
815                "SELECT id, issue_id, event_type, actor, old_value, new_value, comment, created_at
816                 FROM events WHERE issue_id = ? ORDER BY created_at"
817            )?;
818
819            let events = stmt.query_map([issue_id], |row| {
820                let event_type_str: String = row.get(2)?;
821                Ok(Event {
822                    id: row.get(0)?,
823                    issue_id: row.get(1)?,
824                    event_type: event_type_str.parse().unwrap_or(EventType::Updated),
825                    actor: row.get(3)?,
826                    old_value: row.get(4)?,
827                    new_value: row.get(5)?,
828                    comment: row.get(6)?,
829                    created_at: parse_datetime(&row.get::<_, String>(7)?),
830                })
831            })?
832            .filter_map(|r| r.ok())
833            .collect();
834
835            Ok(events)
836        })
837    }
838
839    fn set_config(&self, key: &str, value: &str) -> Result<()> {
840        self.with_conn(|conn| {
841            conn.execute(
842                "INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
843                params![key, value],
844            )?;
845            Ok(())
846        })
847    }
848
849    fn get_config(&self, key: &str) -> Result<Option<String>> {
850        self.with_conn(|conn| {
851            conn.query_row(
852                "SELECT value FROM config WHERE key = ?",
853                [key],
854                |row| row.get(0),
855            ).optional().map_err(|e| e.into())
856        })
857    }
858
859    fn delete_config(&self, key: &str) -> Result<()> {
860        self.with_conn(|conn| {
861            conn.execute("DELETE FROM config WHERE key = ?", [key])?;
862            Ok(())
863        })
864    }
865
866    fn get_all_config(&self) -> Result<HashMap<String, String>> {
867        self.with_conn(|conn| {
868            let mut stmt = conn.prepare("SELECT key, value FROM config")?;
869            let config = stmt.query_map([], |row| {
870                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
871            })?
872            .filter_map(|r| r.ok())
873            .collect();
874            Ok(config)
875        })
876    }
877
878    fn mark_dirty(&self, issue_id: &str) -> Result<()> {
879        self.with_conn(|conn| Self::mark_dirty_internal(conn, issue_id))
880    }
881
882    fn get_dirty_issues(&self) -> Result<Vec<String>> {
883        self.with_conn(|conn| {
884            let mut stmt = conn.prepare("SELECT issue_id FROM dirty_issues ORDER BY marked_at")?;
885            let ids = stmt.query_map([], |row| row.get(0))?
886                .filter_map(|r| r.ok())
887                .collect();
888            Ok(ids)
889        })
890    }
891
892    fn clear_dirty(&self, issue_ids: &[String]) -> Result<()> {
893        if issue_ids.is_empty() {
894            return Ok(());
895        }
896
897        self.with_conn(|conn| {
898            let placeholders: Vec<_> = issue_ids.iter().map(|_| "?").collect();
899            let sql = format!(
900                "DELETE FROM dirty_issues WHERE issue_id IN ({})",
901                placeholders.join(",")
902            );
903
904            let params: Vec<&dyn rusqlite::ToSql> = issue_ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
905            conn.execute(&sql, params.as_slice())?;
906            Ok(())
907        })
908    }
909
910    fn get_statistics(&self) -> Result<Statistics> {
911        self.with_conn(|conn| {
912            let total_issues: usize = conn.query_row(
913                "SELECT COUNT(*) FROM issues WHERE status != 'tombstone'",
914                [],
915                |row| row.get(0),
916            )?;
917
918            let open_issues: usize = conn.query_row(
919                "SELECT COUNT(*) FROM issues WHERE status = 'open'",
920                [],
921                |row| row.get(0),
922            )?;
923
924            let in_progress_issues: usize = conn.query_row(
925                "SELECT COUNT(*) FROM issues WHERE status = 'in_progress'",
926                [],
927                |row| row.get(0),
928            )?;
929
930            let blocked_issues: usize = conn.query_row(
931                "SELECT COUNT(*) FROM issues WHERE status = 'blocked'",
932                [],
933                |row| row.get(0),
934            )?;
935
936            let closed_issues: usize = conn.query_row(
937                "SELECT COUNT(*) FROM issues WHERE status = 'closed'",
938                [],
939                |row| row.get(0),
940            )?;
941
942            let total_dependencies: usize = conn.query_row(
943                "SELECT COUNT(*) FROM dependencies",
944                [],
945                |row| row.get(0),
946            )?;
947
948            // Count ready issues
949            let ready_issues: usize = conn.query_row(
950                r#"
951                SELECT COUNT(*) FROM issues i
952                WHERE i.status = 'open'
953                  AND i.deleted_at IS NULL
954                  AND NOT EXISTS (
955                    SELECT 1 FROM dependencies d
956                    JOIN issues blocker ON blocker.id = d.depends_on_id
957                    WHERE d.issue_id = i.id
958                      AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
959                      AND blocker.status NOT IN ('closed', 'tombstone')
960                  )
961                "#,
962                [],
963                |row| row.get(0),
964            )?;
965
966            Ok(Statistics {
967                total_issues,
968                open_issues,
969                in_progress_issues,
970                blocked_issues,
971                closed_issues,
972                ready_issues,
973                total_dependencies,
974            })
975        })
976    }
977
978    fn next_child_counter(&self, parent_id: &str) -> Result<u32> {
979        self.with_conn(|conn| {
980            conn.execute(
981                "INSERT INTO child_counters (parent_id, counter) VALUES (?, 1)
982                 ON CONFLICT(parent_id) DO UPDATE SET counter = counter + 1",
983                [parent_id],
984            )?;
985
986            let counter: u32 = conn.query_row(
987                "SELECT counter FROM child_counters WHERE parent_id = ?",
988                [parent_id],
989                |row| row.get(0),
990            )?;
991
992            Ok(counter)
993        })
994    }
995
996    fn transaction<F, T>(&self, f: F) -> Result<T>
997    where
998        F: FnOnce() -> Result<T>,
999    {
1000        self.with_conn_mut(|conn| {
1001            let tx = conn.transaction()?;
1002            let result = f();
1003            match result {
1004                Ok(v) => {
1005                    tx.commit()?;
1006                    Ok(v)
1007                }
1008                Err(e) => {
1009                    // Transaction will be rolled back when dropped
1010                    Err(e)
1011                }
1012            }
1013        })
1014    }
1015
1016    fn close(&self) -> Result<()> {
1017        self.closed.store(true, Ordering::Release);
1018        Ok(())
1019    }
1020}
1021
1022/// Parse a datetime string into DateTime<Utc>.
1023fn parse_datetime(s: &str) -> DateTime<Utc> {
1024    DateTime::parse_from_rfc3339(s)
1025        .map(|dt| dt.with_timezone(&Utc))
1026        .unwrap_or_else(|_| Utc::now())
1027}
1028
1029/// Convert a database row to an Issue.
1030fn row_to_issue(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
1031    let status_str: String = row.get("status")?;
1032    let issue_type_str: String = row.get("issue_type")?;
1033    let agent_state_str: Option<String> = row.get("agent_state")?;
1034    let mol_type_str: Option<String> = row.get("mol_type")?;
1035
1036    Ok(Issue {
1037        id: row.get("id")?,
1038        content_hash: row.get("content_hash")?,
1039        title: row.get("title")?,
1040        description: row.get("description")?,
1041        design: row.get("design")?,
1042        acceptance_criteria: row.get("acceptance_criteria")?,
1043        notes: row.get("notes")?,
1044        status: status_str.parse().unwrap_or_default(),
1045        priority: row.get("priority")?,
1046        issue_type: issue_type_str.parse().unwrap_or_default(),
1047        assignee: row.get("assignee")?,
1048        owner: row.get("owner")?,
1049        estimated_minutes: row.get("estimated_minutes")?,
1050        created_at: parse_datetime(&row.get::<_, String>("created_at")?),
1051        created_by: row.get("created_by")?,
1052        updated_at: parse_datetime(&row.get::<_, String>("updated_at")?),
1053        closed_at: row.get::<_, Option<String>>("closed_at")?.map(|s| parse_datetime(&s)),
1054        close_reason: row.get("close_reason")?,
1055        due_at: row.get::<_, Option<String>>("due_at")?.map(|s| parse_datetime(&s)),
1056        defer_until: row.get::<_, Option<String>>("defer_until")?.map(|s| parse_datetime(&s)),
1057        external_ref: row.get("external_ref")?,
1058        source_system: row.get("source_system")?,
1059        labels: Vec::new(), // Loaded separately
1060        deleted_at: row.get::<_, Option<String>>("deleted_at")?.map(|s| parse_datetime(&s)),
1061        deleted_by: row.get("deleted_by")?,
1062        delete_reason: row.get("delete_reason")?,
1063        compaction_level: row.get("compaction_level")?,
1064        compacted_at: row.get::<_, Option<String>>("compacted_at")?.map(|s| parse_datetime(&s)),
1065        compacted_at_commit: row.get("compacted_at_commit")?,
1066        original_size: row.get("original_size")?,
1067        agent_state: agent_state_str.and_then(|s| s.parse().ok()),
1068        mol_type: mol_type_str.and_then(|s| s.parse().ok()),
1069        hook_bead: row.get("hook_bead")?,
1070        role_bead: row.get("role_bead")?,
1071        rig: row.get("rig")?,
1072        last_activity: row.get::<_, Option<String>>("last_activity")?.map(|s| parse_datetime(&s)),
1073        pinned: row.get::<_, i32>("pinned")? != 0,
1074        is_template: row.get::<_, i32>("is_template")? != 0,
1075        ephemeral: row.get::<_, i32>("ephemeral")? != 0,
1076    })
1077}
1078
1079/// Convert a database row to a Dependency.
1080fn row_to_dependency(row: &rusqlite::Row) -> rusqlite::Result<Dependency> {
1081    let dep_type_str: String = row.get(2)?;
1082    Ok(Dependency {
1083        issue_id: row.get(0)?,
1084        depends_on_id: row.get(1)?,
1085        dep_type: dep_type_str.parse().unwrap_or_default(),
1086        created_at: parse_datetime(&row.get::<_, String>(3)?),
1087        created_by: row.get(4)?,
1088        metadata: row.get(5)?,
1089        thread_id: row.get(6)?,
1090    })
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096
1097    #[test]
1098    fn test_create_and_get_issue() {
1099        let storage = SqliteStorage::in_memory().unwrap();
1100
1101        let issue = Issue::new("bd-a1b2", "Test task", "alice");
1102        storage.create_issue(&issue).unwrap();
1103
1104        let retrieved = storage.get_issue("bd-a1b2").unwrap().unwrap();
1105        assert_eq!(retrieved.id, "bd-a1b2");
1106        assert_eq!(retrieved.title, "Test task");
1107        assert_eq!(retrieved.created_by, "alice");
1108    }
1109
1110    #[test]
1111    fn test_dependency_cycle_detection() {
1112        let storage = SqliteStorage::in_memory().unwrap();
1113
1114        // Create issues
1115        storage.create_issue(&Issue::new("bd-1", "Task 1", "alice")).unwrap();
1116        storage.create_issue(&Issue::new("bd-2", "Task 2", "alice")).unwrap();
1117        storage.create_issue(&Issue::new("bd-3", "Task 3", "alice")).unwrap();
1118
1119        // Create chain: 1 -> 2 -> 3
1120        storage.add_dependency(&Dependency::blocks("bd-1", "bd-2")).unwrap();
1121        storage.add_dependency(&Dependency::blocks("bd-2", "bd-3")).unwrap();
1122
1123        // Try to create cycle: 3 -> 1 (should fail)
1124        let result = storage.add_dependency(&Dependency::blocks("bd-3", "bd-1"));
1125        assert!(matches!(result, Err(StorageError::CycleDetected { .. })));
1126    }
1127
1128    #[test]
1129    fn test_ready_work() {
1130        let storage = SqliteStorage::in_memory().unwrap();
1131
1132        // Create issues
1133        storage.create_issue(&Issue::new("bd-1", "Ready task", "alice")).unwrap();
1134        storage.create_issue(&Issue::new("bd-2", "Blocked task", "alice")).unwrap();
1135        storage.create_issue(&Issue::new("bd-3", "Blocker", "alice")).unwrap();
1136
1137        // bd-2 is blocked by bd-3
1138        storage.add_dependency(&Dependency::blocks("bd-2", "bd-3")).unwrap();
1139
1140        let ready = storage.get_ready_work().unwrap();
1141        assert_eq!(ready.len(), 2); // bd-1 and bd-3 are ready
1142
1143        // Close the blocker
1144        storage.close_issue("bd-3", "alice", None).unwrap();
1145
1146        let ready = storage.get_ready_work().unwrap();
1147        assert_eq!(ready.len(), 2); // Now bd-1 and bd-2 are ready
1148    }
1149
1150    #[test]
1151    fn test_labels() {
1152        let storage = SqliteStorage::in_memory().unwrap();
1153
1154        storage.create_issue(&Issue::new("bd-1", "Task", "alice")).unwrap();
1155
1156        storage.add_label("bd-1", "bug").unwrap();
1157        storage.add_label("bd-1", "urgent").unwrap();
1158
1159        let labels = storage.get_labels("bd-1").unwrap();
1160        assert!(labels.contains(&"bug".to_string()));
1161        assert!(labels.contains(&"urgent".to_string()));
1162
1163        let issues = storage.get_issues_by_label("bug").unwrap();
1164        assert_eq!(issues.len(), 1);
1165        assert_eq!(issues[0].id, "bd-1");
1166    }
1167}