1use std::collections::HashMap;
4use std::path::Path;
5use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
6
7use chrono::{DateTime, Utc};
8use rusqlite::{params, Connection, OptionalExtension};
9
10use super::schema::{SCHEMA, SCHEMA_VERSION, set_schema_version};
11use super::{Result, Storage, StorageError};
12use crate::types::{
13 AgentState, BlockedIssue, Comment, Dependency, DependencyType, Event,
14 EventType, Issue, IssueFilter, IssueType, MolType, Status, Statistics,
15};
16
17pub struct SqliteStorage {
19 conn: Arc<Mutex<Connection>>,
20 closed: AtomicBool,
21}
22
23impl SqliteStorage {
24 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
26 let conn = Connection::open(path)?;
27 Self::initialize(conn)
28 }
29
30 pub fn in_memory() -> Result<Self> {
32 let conn = Connection::open_in_memory()?;
33 Self::initialize(conn)
34 }
35
36 fn initialize(conn: Connection) -> Result<Self> {
37 conn.execute_batch(
39 "PRAGMA foreign_keys = ON;
40 PRAGMA journal_mode = WAL;
41 PRAGMA busy_timeout = 5000;
42 PRAGMA synchronous = NORMAL;"
43 )?;
44
45 conn.execute_batch(SCHEMA)?;
47
48 set_schema_version(&conn, SCHEMA_VERSION)?;
50
51 Ok(Self {
52 conn: Arc::new(Mutex::new(conn)),
53 closed: AtomicBool::new(false),
54 })
55 }
56
57 fn check_closed(&self) -> Result<()> {
58 if self.closed.load(Ordering::Acquire) {
59 return Err(StorageError::Closed);
60 }
61 Ok(())
62 }
63
64 fn with_conn<F, T>(&self, f: F) -> Result<T>
65 where
66 F: FnOnce(&Connection) -> Result<T>,
67 {
68 self.check_closed()?;
69 let conn = self.conn.lock().map_err(|e| StorageError::Other(e.to_string()))?;
70 f(&conn)
71 }
72
73 fn with_conn_mut<F, T>(&self, f: F) -> Result<T>
74 where
75 F: FnOnce(&mut Connection) -> Result<T>,
76 {
77 self.check_closed()?;
78 let mut conn = self.conn.lock().map_err(|e| StorageError::Other(e.to_string()))?;
79 f(&mut conn)
80 }
81
82 fn record_event(
84 conn: &Connection,
85 issue_id: &str,
86 event_type: EventType,
87 actor: &str,
88 old_value: Option<&str>,
89 new_value: Option<&str>,
90 ) -> Result<()> {
91 conn.execute(
92 "INSERT INTO events (issue_id, event_type, actor, old_value, new_value, created_at)
93 VALUES (?, ?, ?, ?, ?, ?)",
94 params![
95 issue_id,
96 event_type.as_str(),
97 actor,
98 old_value,
99 new_value,
100 Utc::now().to_rfc3339(),
101 ],
102 )?;
103 Ok(())
104 }
105
106 fn mark_dirty_internal(conn: &Connection, issue_id: &str) -> Result<()> {
108 conn.execute(
109 "INSERT OR REPLACE INTO dirty_issues (issue_id, marked_at) VALUES (?, ?)",
110 params![issue_id, Utc::now().to_rfc3339()],
111 )?;
112 Ok(())
113 }
114}
115
116impl Storage for SqliteStorage {
117 fn create_issue(&self, issue: &Issue) -> Result<()> {
118 self.with_conn(|conn| {
119 let exists: bool = conn.query_row(
121 "SELECT 1 FROM issues WHERE id = ?",
122 [&issue.id],
123 |_| Ok(true),
124 ).optional()?.unwrap_or(false);
125
126 if exists {
127 return Err(StorageError::AlreadyExists(issue.id.clone()));
128 }
129
130 conn.execute(
131 "INSERT INTO issues (
132 id, content_hash, title, description, design, acceptance_criteria, notes,
133 status, priority, issue_type, assignee, owner, estimated_minutes,
134 created_at, created_by, updated_at, closed_at, close_reason,
135 due_at, defer_until, external_ref, source_system,
136 deleted_at, deleted_by, delete_reason,
137 compaction_level, compacted_at, compacted_at_commit, original_size,
138 agent_state, mol_type, hook_bead, role_bead, rig, last_activity,
139 pinned, is_template, ephemeral
140 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
141 params![
142 issue.id,
143 issue.content_hash,
144 issue.title,
145 issue.description,
146 issue.design,
147 issue.acceptance_criteria,
148 issue.notes,
149 issue.status.as_str(),
150 issue.priority,
151 issue.issue_type.as_str(),
152 issue.assignee,
153 issue.owner,
154 issue.estimated_minutes,
155 issue.created_at.to_rfc3339(),
156 issue.created_by,
157 issue.updated_at.to_rfc3339(),
158 issue.closed_at.map(|t| t.to_rfc3339()),
159 issue.close_reason,
160 issue.due_at.map(|t| t.to_rfc3339()),
161 issue.defer_until.map(|t| t.to_rfc3339()),
162 issue.external_ref,
163 issue.source_system,
164 issue.deleted_at.map(|t| t.to_rfc3339()),
165 issue.deleted_by,
166 issue.delete_reason,
167 issue.compaction_level,
168 issue.compacted_at.map(|t| t.to_rfc3339()),
169 issue.compacted_at_commit,
170 issue.original_size,
171 issue.agent_state.map(|s| s.as_str()),
172 issue.mol_type.map(|m| m.as_str()),
173 issue.hook_bead,
174 issue.role_bead,
175 issue.rig,
176 issue.last_activity.map(|t| t.to_rfc3339()),
177 issue.pinned,
178 issue.is_template,
179 issue.ephemeral,
180 ],
181 )?;
182
183 for label in &issue.labels {
185 conn.execute(
186 "INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
187 params![issue.id, label],
188 )?;
189 }
190
191 Self::record_event(conn, &issue.id, EventType::Created, &issue.created_by, None, None)?;
193
194 Self::mark_dirty_internal(conn, &issue.id)?;
196
197 Ok(())
198 })
199 }
200
201 fn create_issues(&self, issues: &[Issue]) -> Result<()> {
202 self.with_conn_mut(|conn| {
203 let tx = conn.transaction()?;
204 for issue in issues {
205 tx.execute(
207 "INSERT INTO issues (
208 id, content_hash, title, description, design, acceptance_criteria, notes,
209 status, priority, issue_type, assignee, owner, estimated_minutes,
210 created_at, created_by, updated_at, closed_at, close_reason,
211 due_at, defer_until, external_ref, source_system,
212 deleted_at, deleted_by, delete_reason,
213 compaction_level, compacted_at, compacted_at_commit, original_size,
214 agent_state, mol_type, hook_bead, role_bead, rig, last_activity,
215 pinned, is_template, ephemeral
216 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
217 params![
218 issue.id,
219 issue.content_hash,
220 issue.title,
221 issue.description,
222 issue.design,
223 issue.acceptance_criteria,
224 issue.notes,
225 issue.status.as_str(),
226 issue.priority,
227 issue.issue_type.as_str(),
228 issue.assignee,
229 issue.owner,
230 issue.estimated_minutes,
231 issue.created_at.to_rfc3339(),
232 issue.created_by,
233 issue.updated_at.to_rfc3339(),
234 issue.closed_at.map(|t| t.to_rfc3339()),
235 issue.close_reason,
236 issue.due_at.map(|t| t.to_rfc3339()),
237 issue.defer_until.map(|t| t.to_rfc3339()),
238 issue.external_ref,
239 issue.source_system,
240 issue.deleted_at.map(|t| t.to_rfc3339()),
241 issue.deleted_by,
242 issue.delete_reason,
243 issue.compaction_level,
244 issue.compacted_at.map(|t| t.to_rfc3339()),
245 issue.compacted_at_commit,
246 issue.original_size,
247 issue.agent_state.map(|s| s.as_str()),
248 issue.mol_type.map(|m| m.as_str()),
249 issue.hook_bead,
250 issue.role_bead,
251 issue.rig,
252 issue.last_activity.map(|t| t.to_rfc3339()),
253 issue.pinned,
254 issue.is_template,
255 issue.ephemeral,
256 ],
257 )?;
258
259 for label in &issue.labels {
261 tx.execute(
262 "INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
263 params![issue.id, label],
264 )?;
265 }
266
267 tx.execute(
269 "INSERT OR REPLACE INTO dirty_issues (issue_id, marked_at) VALUES (?, ?)",
270 params![issue.id, Utc::now().to_rfc3339()],
271 )?;
272 }
273 tx.commit()?;
274 Ok(())
275 })
276 }
277
278 fn get_issue(&self, id: &str) -> Result<Option<Issue>> {
279 self.with_conn(|conn| {
280 let issue = conn.query_row(
281 "SELECT * FROM issues WHERE id = ?",
282 [id],
283 |row| row_to_issue(row),
284 ).optional()?;
285
286 if let Some(mut issue) = issue {
287 let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
289 let labels: Vec<String> = stmt.query_map([id], |row| row.get(0))?
290 .filter_map(|r| r.ok())
291 .collect();
292 issue.labels = labels;
293 Ok(Some(issue))
294 } else {
295 Ok(None)
296 }
297 })
298 }
299
300 fn get_issue_by_external_ref(&self, external_ref: &str) -> Result<Option<Issue>> {
301 self.with_conn(|conn| {
302 let issue = conn.query_row(
303 "SELECT * FROM issues WHERE external_ref = ?",
304 [external_ref],
305 |row| row_to_issue(row),
306 ).optional()?;
307
308 if let Some(mut issue) = issue {
309 let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
311 let labels: Vec<String> = stmt.query_map([&issue.id], |row| row.get(0))?
312 .filter_map(|r| r.ok())
313 .collect();
314 issue.labels = labels;
315 Ok(Some(issue))
316 } else {
317 Ok(None)
318 }
319 })
320 }
321
322 fn update_issue(&self, issue: &Issue) -> Result<()> {
323 self.with_conn(|conn| {
324 let rows = conn.execute(
325 "UPDATE issues SET
326 content_hash = ?, title = ?, description = ?, design = ?,
327 acceptance_criteria = ?, notes = ?, status = ?, priority = ?,
328 issue_type = ?, assignee = ?, owner = ?, estimated_minutes = ?,
329 updated_at = ?, closed_at = ?, close_reason = ?, due_at = ?,
330 defer_until = ?, external_ref = ?, source_system = ?,
331 deleted_at = ?, deleted_by = ?, delete_reason = ?,
332 compaction_level = ?, compacted_at = ?, compacted_at_commit = ?,
333 original_size = ?, agent_state = ?, mol_type = ?,
334 hook_bead = ?, role_bead = ?, rig = ?, last_activity = ?,
335 pinned = ?, is_template = ?, ephemeral = ?
336 WHERE id = ?",
337 params![
338 issue.content_hash,
339 issue.title,
340 issue.description,
341 issue.design,
342 issue.acceptance_criteria,
343 issue.notes,
344 issue.status.as_str(),
345 issue.priority,
346 issue.issue_type.as_str(),
347 issue.assignee,
348 issue.owner,
349 issue.estimated_minutes,
350 issue.updated_at.to_rfc3339(),
351 issue.closed_at.map(|t| t.to_rfc3339()),
352 issue.close_reason,
353 issue.due_at.map(|t| t.to_rfc3339()),
354 issue.defer_until.map(|t| t.to_rfc3339()),
355 issue.external_ref,
356 issue.source_system,
357 issue.deleted_at.map(|t| t.to_rfc3339()),
358 issue.deleted_by,
359 issue.delete_reason,
360 issue.compaction_level,
361 issue.compacted_at.map(|t| t.to_rfc3339()),
362 issue.compacted_at_commit,
363 issue.original_size,
364 issue.agent_state.map(|s| s.as_str()),
365 issue.mol_type.map(|m| m.as_str()),
366 issue.hook_bead,
367 issue.role_bead,
368 issue.rig,
369 issue.last_activity.map(|t| t.to_rfc3339()),
370 issue.pinned,
371 issue.is_template,
372 issue.ephemeral,
373 issue.id,
374 ],
375 )?;
376
377 if rows == 0 {
378 return Err(StorageError::NotFound(issue.id.clone()));
379 }
380
381 conn.execute("DELETE FROM labels WHERE issue_id = ?", [&issue.id])?;
383 for label in &issue.labels {
384 conn.execute(
385 "INSERT INTO labels (issue_id, label) VALUES (?, ?)",
386 params![issue.id, label],
387 )?;
388 }
389
390 Self::record_event(conn, &issue.id, EventType::Updated, &issue.created_by, None, None)?;
392
393 Self::mark_dirty_internal(conn, &issue.id)?;
395
396 Ok(())
397 })
398 }
399
400 fn close_issue(&self, id: &str, actor: &str, reason: Option<&str>) -> Result<()> {
401 self.with_conn(|conn| {
402 let now = Utc::now().to_rfc3339();
403 let rows = conn.execute(
404 "UPDATE issues SET status = 'closed', closed_at = ?, close_reason = ?, updated_at = ? WHERE id = ?",
405 params![now, reason, now, id],
406 )?;
407
408 if rows == 0 {
409 return Err(StorageError::NotFound(id.to_string()));
410 }
411
412 Self::record_event(conn, id, EventType::Closed, actor, None, reason)?;
413 Self::mark_dirty_internal(conn, id)?;
414
415 Ok(())
416 })
417 }
418
419 fn delete_issue(&self, id: &str, actor: &str, reason: Option<&str>) -> Result<()> {
420 self.with_conn(|conn| {
421 let now = Utc::now().to_rfc3339();
422 let rows = conn.execute(
423 "UPDATE issues SET status = 'tombstone', deleted_at = ?, deleted_by = ?, delete_reason = ?, updated_at = ? WHERE id = ?",
424 params![now, actor, reason, now, id],
425 )?;
426
427 if rows == 0 {
428 return Err(StorageError::NotFound(id.to_string()));
429 }
430
431 Self::record_event(conn, id, EventType::StatusChanged, actor, Some("open"), Some("tombstone"))?;
432 Self::mark_dirty_internal(conn, id)?;
433
434 Ok(())
435 })
436 }
437
438 fn search_issues(&self, filter: &IssueFilter) -> Result<Vec<Issue>> {
439 self.with_conn(|conn| {
440 let mut sql = String::from("SELECT * FROM issues WHERE 1=1");
441 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
442
443 if let Some(ref status) = filter.status {
445 sql.push_str(" AND status = ?");
446 params.push(Box::new(status.as_str().to_string()));
447 }
448 if let Some(ref statuses) = filter.statuses {
449 let placeholders: Vec<_> = statuses.iter().map(|_| "?").collect();
450 sql.push_str(&format!(" AND status IN ({})", placeholders.join(",")));
451 for s in statuses {
452 params.push(Box::new(s.as_str().to_string()));
453 }
454 }
455
456 if !filter.include_tombstones {
458 sql.push_str(" AND status != 'tombstone'");
459 }
460
461 if let Some(ref issue_type) = filter.issue_type {
463 sql.push_str(" AND issue_type = ?");
464 params.push(Box::new(issue_type.as_str().to_string()));
465 }
466
467 if let Some(ref assignee) = filter.assignee {
469 sql.push_str(" AND assignee = ?");
470 params.push(Box::new(assignee.clone()));
471 }
472
473 if let Some(priority) = filter.priority {
475 sql.push_str(" AND priority = ?");
476 params.push(Box::new(priority));
477 }
478
479 if let Some(ref text) = filter.text_search {
481 sql.push_str(" AND (title LIKE ? OR description LIKE ? OR notes LIKE ?)");
482 let pattern = format!("%{}%", text);
483 params.push(Box::new(pattern.clone()));
484 params.push(Box::new(pattern.clone()));
485 params.push(Box::new(pattern));
486 }
487
488 sql.push_str(" ORDER BY ");
490 match filter.sort_by {
491 Some(crate::types::filter::SortField::Priority) => sql.push_str("priority"),
492 Some(crate::types::filter::SortField::Title) => sql.push_str("title"),
493 Some(crate::types::filter::SortField::UpdatedAt) => sql.push_str("updated_at"),
494 _ => sql.push_str("created_at"),
495 }
496 if filter.sort_desc {
497 sql.push_str(" DESC");
498 }
499
500 if let Some(limit) = filter.limit {
502 sql.push_str(&format!(" LIMIT {}", limit));
503 }
504 if let Some(offset) = filter.offset {
505 sql.push_str(&format!(" OFFSET {}", offset));
506 }
507
508 let mut stmt = conn.prepare(&sql)?;
509 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
510
511 let issues: Vec<Issue> = stmt.query_map(params_refs.as_slice(), |row| row_to_issue(row))?
512 .filter_map(|r| r.ok())
513 .collect();
514
515 Ok(issues)
516 })
517 }
518
519 fn add_dependency(&self, dep: &Dependency) -> Result<()> {
520 if dep.dep_type.check_cycles() && self.would_create_cycle(&dep.issue_id, &dep.depends_on_id, dep.dep_type)? {
522 return Err(StorageError::CycleDetected {
523 from: dep.issue_id.clone(),
524 to: dep.depends_on_id.clone(),
525 });
526 }
527
528 self.with_conn(|conn| {
529 conn.execute(
530 "INSERT OR REPLACE INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
531 VALUES (?, ?, ?, ?, ?, ?, ?)",
532 params![
533 dep.issue_id,
534 dep.depends_on_id,
535 dep.dep_type.as_str(),
536 dep.created_at.to_rfc3339(),
537 dep.created_by,
538 dep.metadata,
539 dep.thread_id,
540 ],
541 )?;
542
543 Self::record_event(
544 conn,
545 &dep.issue_id,
546 EventType::DependencyAdded,
547 dep.created_by.as_deref().unwrap_or("system"),
548 None,
549 Some(&dep.depends_on_id),
550 )?;
551
552 Self::mark_dirty_internal(conn, &dep.issue_id)?;
553
554 Ok(())
555 })
556 }
557
558 fn remove_dependency(&self, issue_id: &str, depends_on_id: &str) -> Result<()> {
559 self.with_conn(|conn| {
560 let rows = conn.execute(
561 "DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?",
562 params![issue_id, depends_on_id],
563 )?;
564
565 if rows > 0 {
566 Self::record_event(conn, issue_id, EventType::DependencyRemoved, "system", Some(depends_on_id), None)?;
567 Self::mark_dirty_internal(conn, issue_id)?;
568 }
569
570 Ok(())
571 })
572 }
573
574 fn get_dependencies(&self, issue_id: &str) -> Result<Vec<Dependency>> {
575 self.with_conn(|conn| {
576 let mut stmt = conn.prepare(
577 "SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
578 FROM dependencies WHERE issue_id = ?"
579 )?;
580
581 let deps = stmt.query_map([issue_id], |row| row_to_dependency(row))?
582 .filter_map(|r| r.ok())
583 .collect();
584
585 Ok(deps)
586 })
587 }
588
589 fn get_dependents(&self, issue_id: &str) -> Result<Vec<Dependency>> {
590 self.with_conn(|conn| {
591 let mut stmt = conn.prepare(
592 "SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
593 FROM dependencies WHERE depends_on_id = ?"
594 )?;
595
596 let deps = stmt.query_map([issue_id], |row| row_to_dependency(row))?
597 .filter_map(|r| r.ok())
598 .collect();
599
600 Ok(deps)
601 })
602 }
603
604 fn would_create_cycle(&self, from_id: &str, to_id: &str, dep_type: DependencyType) -> Result<bool> {
605 if !dep_type.check_cycles() {
606 return Ok(false);
607 }
608
609 self.with_conn(|conn| {
610 let sql = r#"
612 WITH RECURSIVE reachable(id, depth) AS (
613 SELECT ?, 0
614 UNION
615 SELECT d.depends_on_id, r.depth + 1
616 FROM reachable r
617 JOIN dependencies d ON d.issue_id = r.id
618 WHERE r.depth < 100
619 AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
620 )
621 SELECT 1 FROM reachable WHERE id = ? LIMIT 1
622 "#;
623
624 let exists: bool = conn.query_row(sql, params![to_id, from_id], |_| Ok(true))
625 .optional()?
626 .unwrap_or(false);
627
628 Ok(exists)
629 })
630 }
631
632 fn get_ready_work(&self) -> Result<Vec<Issue>> {
633 self.with_conn(|conn| {
634 let sql = r#"
635 SELECT i.* FROM issues i
636 WHERE i.status = 'open'
637 AND i.deleted_at IS NULL
638 AND NOT EXISTS (
639 SELECT 1 FROM dependencies d
640 JOIN issues blocker ON blocker.id = d.depends_on_id
641 WHERE d.issue_id = i.id
642 AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
643 AND blocker.status NOT IN ('closed', 'tombstone')
644 )
645 ORDER BY i.priority, i.created_at
646 "#;
647
648 let mut stmt = conn.prepare(sql)?;
649 let issues = stmt.query_map([], |row| row_to_issue(row))?
650 .filter_map(|r| r.ok())
651 .collect();
652
653 Ok(issues)
654 })
655 }
656
657 fn get_blocked_issues(&self) -> Result<Vec<BlockedIssue>> {
658 self.with_conn(|conn| {
659 let sql = r#"
660 SELECT i.*, COUNT(d.depends_on_id) as blocking_count,
661 GROUP_CONCAT(d.depends_on_id) as blocking_ids
662 FROM issues i
663 JOIN dependencies d ON d.issue_id = i.id
664 JOIN issues blocker ON blocker.id = d.depends_on_id
665 WHERE i.status IN ('open', 'blocked')
666 AND i.deleted_at IS NULL
667 AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
668 AND blocker.status NOT IN ('closed', 'tombstone')
669 GROUP BY i.id
670 ORDER BY blocking_count DESC, i.priority, i.created_at
671 "#;
672
673 let mut stmt = conn.prepare(sql)?;
674 let blocked = stmt.query_map([], |row| {
675 let issue = row_to_issue(row)?;
676 let blocking_count: usize = row.get("blocking_count")?;
677 let blocking_ids_str: String = row.get("blocking_ids")?;
678 let blocking_ids: Vec<String> = blocking_ids_str
679 .split(',')
680 .map(|s| s.to_string())
681 .collect();
682
683 Ok(BlockedIssue {
684 issue,
685 blocking_count,
686 blocking_ids,
687 })
688 })?
689 .filter_map(|r| r.ok())
690 .collect();
691
692 Ok(blocked)
693 })
694 }
695
696 fn is_blocked(&self, issue_id: &str) -> Result<bool> {
697 self.with_conn(|conn| {
698 let sql = r#"
699 SELECT 1 FROM dependencies d
700 JOIN issues blocker ON blocker.id = d.depends_on_id
701 WHERE d.issue_id = ?
702 AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
703 AND blocker.status NOT IN ('closed', 'tombstone')
704 LIMIT 1
705 "#;
706
707 let blocked: bool = conn.query_row(sql, [issue_id], |_| Ok(true))
708 .optional()?
709 .unwrap_or(false);
710
711 Ok(blocked)
712 })
713 }
714
715 fn add_label(&self, issue_id: &str, label: &str) -> Result<()> {
716 self.with_conn(|conn| {
717 conn.execute(
718 "INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
719 params![issue_id, label],
720 )?;
721
722 Self::record_event(conn, issue_id, EventType::LabelAdded, "system", None, Some(label))?;
723 Self::mark_dirty_internal(conn, issue_id)?;
724
725 Ok(())
726 })
727 }
728
729 fn remove_label(&self, issue_id: &str, label: &str) -> Result<()> {
730 self.with_conn(|conn| {
731 let rows = conn.execute(
732 "DELETE FROM labels WHERE issue_id = ? AND label = ?",
733 params![issue_id, label],
734 )?;
735
736 if rows > 0 {
737 Self::record_event(conn, issue_id, EventType::LabelRemoved, "system", Some(label), None)?;
738 Self::mark_dirty_internal(conn, issue_id)?;
739 }
740
741 Ok(())
742 })
743 }
744
745 fn get_labels(&self, issue_id: &str) -> Result<Vec<String>> {
746 self.with_conn(|conn| {
747 let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
748 let labels = stmt.query_map([issue_id], |row| row.get(0))?
749 .filter_map(|r| r.ok())
750 .collect();
751 Ok(labels)
752 })
753 }
754
755 fn get_issues_by_label(&self, label: &str) -> Result<Vec<Issue>> {
756 self.with_conn(|conn| {
757 let sql = r#"
758 SELECT i.* FROM issues i
759 JOIN labels l ON l.issue_id = i.id
760 WHERE l.label = ?
761 AND i.status != 'tombstone'
762 ORDER BY i.created_at DESC
763 "#;
764
765 let mut stmt = conn.prepare(sql)?;
766 let issues = stmt.query_map([label], |row| row_to_issue(row))?
767 .filter_map(|r| r.ok())
768 .collect();
769
770 Ok(issues)
771 })
772 }
773
774 fn add_comment(&self, issue_id: &str, author: &str, text: &str) -> Result<i64> {
775 self.with_conn(|conn| {
776 conn.execute(
777 "INSERT INTO comments (issue_id, author, text, created_at) VALUES (?, ?, ?, ?)",
778 params![issue_id, author, text, Utc::now().to_rfc3339()],
779 )?;
780
781 let id = conn.last_insert_rowid();
782
783 Self::record_event(conn, issue_id, EventType::Commented, author, None, None)?;
784 Self::mark_dirty_internal(conn, issue_id)?;
785
786 Ok(id)
787 })
788 }
789
790 fn get_comments(&self, issue_id: &str) -> Result<Vec<Comment>> {
791 self.with_conn(|conn| {
792 let mut stmt = conn.prepare(
793 "SELECT id, issue_id, author, text, created_at FROM comments WHERE issue_id = ? ORDER BY created_at"
794 )?;
795
796 let comments = stmt.query_map([issue_id], |row| {
797 Ok(Comment {
798 id: row.get(0)?,
799 issue_id: row.get(1)?,
800 author: row.get(2)?,
801 text: row.get(3)?,
802 created_at: parse_datetime(&row.get::<_, String>(4)?),
803 })
804 })?
805 .filter_map(|r| r.ok())
806 .collect();
807
808 Ok(comments)
809 })
810 }
811
812 fn get_events(&self, issue_id: &str) -> Result<Vec<Event>> {
813 self.with_conn(|conn| {
814 let mut stmt = conn.prepare(
815 "SELECT id, issue_id, event_type, actor, old_value, new_value, comment, created_at
816 FROM events WHERE issue_id = ? ORDER BY created_at"
817 )?;
818
819 let events = stmt.query_map([issue_id], |row| {
820 let event_type_str: String = row.get(2)?;
821 Ok(Event {
822 id: row.get(0)?,
823 issue_id: row.get(1)?,
824 event_type: event_type_str.parse().unwrap_or(EventType::Updated),
825 actor: row.get(3)?,
826 old_value: row.get(4)?,
827 new_value: row.get(5)?,
828 comment: row.get(6)?,
829 created_at: parse_datetime(&row.get::<_, String>(7)?),
830 })
831 })?
832 .filter_map(|r| r.ok())
833 .collect();
834
835 Ok(events)
836 })
837 }
838
839 fn set_config(&self, key: &str, value: &str) -> Result<()> {
840 self.with_conn(|conn| {
841 conn.execute(
842 "INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
843 params![key, value],
844 )?;
845 Ok(())
846 })
847 }
848
849 fn get_config(&self, key: &str) -> Result<Option<String>> {
850 self.with_conn(|conn| {
851 conn.query_row(
852 "SELECT value FROM config WHERE key = ?",
853 [key],
854 |row| row.get(0),
855 ).optional().map_err(|e| e.into())
856 })
857 }
858
859 fn delete_config(&self, key: &str) -> Result<()> {
860 self.with_conn(|conn| {
861 conn.execute("DELETE FROM config WHERE key = ?", [key])?;
862 Ok(())
863 })
864 }
865
866 fn get_all_config(&self) -> Result<HashMap<String, String>> {
867 self.with_conn(|conn| {
868 let mut stmt = conn.prepare("SELECT key, value FROM config")?;
869 let config = stmt.query_map([], |row| {
870 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
871 })?
872 .filter_map(|r| r.ok())
873 .collect();
874 Ok(config)
875 })
876 }
877
878 fn mark_dirty(&self, issue_id: &str) -> Result<()> {
879 self.with_conn(|conn| Self::mark_dirty_internal(conn, issue_id))
880 }
881
882 fn get_dirty_issues(&self) -> Result<Vec<String>> {
883 self.with_conn(|conn| {
884 let mut stmt = conn.prepare("SELECT issue_id FROM dirty_issues ORDER BY marked_at")?;
885 let ids = stmt.query_map([], |row| row.get(0))?
886 .filter_map(|r| r.ok())
887 .collect();
888 Ok(ids)
889 })
890 }
891
892 fn clear_dirty(&self, issue_ids: &[String]) -> Result<()> {
893 if issue_ids.is_empty() {
894 return Ok(());
895 }
896
897 self.with_conn(|conn| {
898 let placeholders: Vec<_> = issue_ids.iter().map(|_| "?").collect();
899 let sql = format!(
900 "DELETE FROM dirty_issues WHERE issue_id IN ({})",
901 placeholders.join(",")
902 );
903
904 let params: Vec<&dyn rusqlite::ToSql> = issue_ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
905 conn.execute(&sql, params.as_slice())?;
906 Ok(())
907 })
908 }
909
910 fn get_statistics(&self) -> Result<Statistics> {
911 self.with_conn(|conn| {
912 let total_issues: usize = conn.query_row(
913 "SELECT COUNT(*) FROM issues WHERE status != 'tombstone'",
914 [],
915 |row| row.get(0),
916 )?;
917
918 let open_issues: usize = conn.query_row(
919 "SELECT COUNT(*) FROM issues WHERE status = 'open'",
920 [],
921 |row| row.get(0),
922 )?;
923
924 let in_progress_issues: usize = conn.query_row(
925 "SELECT COUNT(*) FROM issues WHERE status = 'in_progress'",
926 [],
927 |row| row.get(0),
928 )?;
929
930 let blocked_issues: usize = conn.query_row(
931 "SELECT COUNT(*) FROM issues WHERE status = 'blocked'",
932 [],
933 |row| row.get(0),
934 )?;
935
936 let closed_issues: usize = conn.query_row(
937 "SELECT COUNT(*) FROM issues WHERE status = 'closed'",
938 [],
939 |row| row.get(0),
940 )?;
941
942 let total_dependencies: usize = conn.query_row(
943 "SELECT COUNT(*) FROM dependencies",
944 [],
945 |row| row.get(0),
946 )?;
947
948 let ready_issues: usize = conn.query_row(
950 r#"
951 SELECT COUNT(*) FROM issues i
952 WHERE i.status = 'open'
953 AND i.deleted_at IS NULL
954 AND NOT EXISTS (
955 SELECT 1 FROM dependencies d
956 JOIN issues blocker ON blocker.id = d.depends_on_id
957 WHERE d.issue_id = i.id
958 AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
959 AND blocker.status NOT IN ('closed', 'tombstone')
960 )
961 "#,
962 [],
963 |row| row.get(0),
964 )?;
965
966 Ok(Statistics {
967 total_issues,
968 open_issues,
969 in_progress_issues,
970 blocked_issues,
971 closed_issues,
972 ready_issues,
973 total_dependencies,
974 })
975 })
976 }
977
978 fn next_child_counter(&self, parent_id: &str) -> Result<u32> {
979 self.with_conn(|conn| {
980 conn.execute(
981 "INSERT INTO child_counters (parent_id, counter) VALUES (?, 1)
982 ON CONFLICT(parent_id) DO UPDATE SET counter = counter + 1",
983 [parent_id],
984 )?;
985
986 let counter: u32 = conn.query_row(
987 "SELECT counter FROM child_counters WHERE parent_id = ?",
988 [parent_id],
989 |row| row.get(0),
990 )?;
991
992 Ok(counter)
993 })
994 }
995
996 fn transaction<F, T>(&self, f: F) -> Result<T>
997 where
998 F: FnOnce() -> Result<T>,
999 {
1000 self.with_conn_mut(|conn| {
1001 let tx = conn.transaction()?;
1002 let result = f();
1003 match result {
1004 Ok(v) => {
1005 tx.commit()?;
1006 Ok(v)
1007 }
1008 Err(e) => {
1009 Err(e)
1011 }
1012 }
1013 })
1014 }
1015
1016 fn close(&self) -> Result<()> {
1017 self.closed.store(true, Ordering::Release);
1018 Ok(())
1019 }
1020}
1021
1022fn parse_datetime(s: &str) -> DateTime<Utc> {
1024 DateTime::parse_from_rfc3339(s)
1025 .map(|dt| dt.with_timezone(&Utc))
1026 .unwrap_or_else(|_| Utc::now())
1027}
1028
1029fn row_to_issue(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
1031 let status_str: String = row.get("status")?;
1032 let issue_type_str: String = row.get("issue_type")?;
1033 let agent_state_str: Option<String> = row.get("agent_state")?;
1034 let mol_type_str: Option<String> = row.get("mol_type")?;
1035
1036 Ok(Issue {
1037 id: row.get("id")?,
1038 content_hash: row.get("content_hash")?,
1039 title: row.get("title")?,
1040 description: row.get("description")?,
1041 design: row.get("design")?,
1042 acceptance_criteria: row.get("acceptance_criteria")?,
1043 notes: row.get("notes")?,
1044 status: status_str.parse().unwrap_or_default(),
1045 priority: row.get("priority")?,
1046 issue_type: issue_type_str.parse().unwrap_or_default(),
1047 assignee: row.get("assignee")?,
1048 owner: row.get("owner")?,
1049 estimated_minutes: row.get("estimated_minutes")?,
1050 created_at: parse_datetime(&row.get::<_, String>("created_at")?),
1051 created_by: row.get("created_by")?,
1052 updated_at: parse_datetime(&row.get::<_, String>("updated_at")?),
1053 closed_at: row.get::<_, Option<String>>("closed_at")?.map(|s| parse_datetime(&s)),
1054 close_reason: row.get("close_reason")?,
1055 due_at: row.get::<_, Option<String>>("due_at")?.map(|s| parse_datetime(&s)),
1056 defer_until: row.get::<_, Option<String>>("defer_until")?.map(|s| parse_datetime(&s)),
1057 external_ref: row.get("external_ref")?,
1058 source_system: row.get("source_system")?,
1059 labels: Vec::new(), deleted_at: row.get::<_, Option<String>>("deleted_at")?.map(|s| parse_datetime(&s)),
1061 deleted_by: row.get("deleted_by")?,
1062 delete_reason: row.get("delete_reason")?,
1063 compaction_level: row.get("compaction_level")?,
1064 compacted_at: row.get::<_, Option<String>>("compacted_at")?.map(|s| parse_datetime(&s)),
1065 compacted_at_commit: row.get("compacted_at_commit")?,
1066 original_size: row.get("original_size")?,
1067 agent_state: agent_state_str.and_then(|s| s.parse().ok()),
1068 mol_type: mol_type_str.and_then(|s| s.parse().ok()),
1069 hook_bead: row.get("hook_bead")?,
1070 role_bead: row.get("role_bead")?,
1071 rig: row.get("rig")?,
1072 last_activity: row.get::<_, Option<String>>("last_activity")?.map(|s| parse_datetime(&s)),
1073 pinned: row.get::<_, i32>("pinned")? != 0,
1074 is_template: row.get::<_, i32>("is_template")? != 0,
1075 ephemeral: row.get::<_, i32>("ephemeral")? != 0,
1076 })
1077}
1078
1079fn row_to_dependency(row: &rusqlite::Row) -> rusqlite::Result<Dependency> {
1081 let dep_type_str: String = row.get(2)?;
1082 Ok(Dependency {
1083 issue_id: row.get(0)?,
1084 depends_on_id: row.get(1)?,
1085 dep_type: dep_type_str.parse().unwrap_or_default(),
1086 created_at: parse_datetime(&row.get::<_, String>(3)?),
1087 created_by: row.get(4)?,
1088 metadata: row.get(5)?,
1089 thread_id: row.get(6)?,
1090 })
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096
1097 #[test]
1098 fn test_create_and_get_issue() {
1099 let storage = SqliteStorage::in_memory().unwrap();
1100
1101 let issue = Issue::new("bd-a1b2", "Test task", "alice");
1102 storage.create_issue(&issue).unwrap();
1103
1104 let retrieved = storage.get_issue("bd-a1b2").unwrap().unwrap();
1105 assert_eq!(retrieved.id, "bd-a1b2");
1106 assert_eq!(retrieved.title, "Test task");
1107 assert_eq!(retrieved.created_by, "alice");
1108 }
1109
1110 #[test]
1111 fn test_dependency_cycle_detection() {
1112 let storage = SqliteStorage::in_memory().unwrap();
1113
1114 storage.create_issue(&Issue::new("bd-1", "Task 1", "alice")).unwrap();
1116 storage.create_issue(&Issue::new("bd-2", "Task 2", "alice")).unwrap();
1117 storage.create_issue(&Issue::new("bd-3", "Task 3", "alice")).unwrap();
1118
1119 storage.add_dependency(&Dependency::blocks("bd-1", "bd-2")).unwrap();
1121 storage.add_dependency(&Dependency::blocks("bd-2", "bd-3")).unwrap();
1122
1123 let result = storage.add_dependency(&Dependency::blocks("bd-3", "bd-1"));
1125 assert!(matches!(result, Err(StorageError::CycleDetected { .. })));
1126 }
1127
1128 #[test]
1129 fn test_ready_work() {
1130 let storage = SqliteStorage::in_memory().unwrap();
1131
1132 storage.create_issue(&Issue::new("bd-1", "Ready task", "alice")).unwrap();
1134 storage.create_issue(&Issue::new("bd-2", "Blocked task", "alice")).unwrap();
1135 storage.create_issue(&Issue::new("bd-3", "Blocker", "alice")).unwrap();
1136
1137 storage.add_dependency(&Dependency::blocks("bd-2", "bd-3")).unwrap();
1139
1140 let ready = storage.get_ready_work().unwrap();
1141 assert_eq!(ready.len(), 2); storage.close_issue("bd-3", "alice", None).unwrap();
1145
1146 let ready = storage.get_ready_work().unwrap();
1147 assert_eq!(ready.len(), 2); }
1149
1150 #[test]
1151 fn test_labels() {
1152 let storage = SqliteStorage::in_memory().unwrap();
1153
1154 storage.create_issue(&Issue::new("bd-1", "Task", "alice")).unwrap();
1155
1156 storage.add_label("bd-1", "bug").unwrap();
1157 storage.add_label("bd-1", "urgent").unwrap();
1158
1159 let labels = storage.get_labels("bd-1").unwrap();
1160 assert!(labels.contains(&"bug".to_string()));
1161 assert!(labels.contains(&"urgent".to_string()));
1162
1163 let issues = storage.get_issues_by_label("bug").unwrap();
1164 assert_eq!(issues.len(), 1);
1165 assert_eq!(issues[0].id, "bd-1");
1166 }
1167}