use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection, OptionalExtension};
use super::schema::{SCHEMA, SCHEMA_VERSION, set_schema_version};
use super::{Result, Storage, StorageError};
use crate::types::{
AgentState, BlockedIssue, Comment, Dependency, DependencyType, Event,
EventType, Issue, IssueFilter, IssueType, MolType, Status, Statistics,
};
pub struct SqliteStorage {
conn: Arc<Mutex<Connection>>,
closed: AtomicBool,
}
impl SqliteStorage {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let conn = Connection::open(path)?;
Self::initialize(conn)
}
pub fn in_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
Self::initialize(conn)
}
fn initialize(conn: Connection) -> Result<Self> {
conn.execute_batch(
"PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
PRAGMA busy_timeout = 5000;
PRAGMA synchronous = NORMAL;"
)?;
conn.execute_batch(SCHEMA)?;
set_schema_version(&conn, SCHEMA_VERSION)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
closed: AtomicBool::new(false),
})
}
fn check_closed(&self) -> Result<()> {
if self.closed.load(Ordering::Acquire) {
return Err(StorageError::Closed);
}
Ok(())
}
fn with_conn<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&Connection) -> Result<T>,
{
self.check_closed()?;
let conn = self.conn.lock().map_err(|e| StorageError::Other(e.to_string()))?;
f(&conn)
}
fn with_conn_mut<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut Connection) -> Result<T>,
{
self.check_closed()?;
let mut conn = self.conn.lock().map_err(|e| StorageError::Other(e.to_string()))?;
f(&mut conn)
}
fn record_event(
conn: &Connection,
issue_id: &str,
event_type: EventType,
actor: &str,
old_value: Option<&str>,
new_value: Option<&str>,
) -> Result<()> {
conn.execute(
"INSERT INTO events (issue_id, event_type, actor, old_value, new_value, created_at)
VALUES (?, ?, ?, ?, ?, ?)",
params![
issue_id,
event_type.as_str(),
actor,
old_value,
new_value,
Utc::now().to_rfc3339(),
],
)?;
Ok(())
}
fn mark_dirty_internal(conn: &Connection, issue_id: &str) -> Result<()> {
conn.execute(
"INSERT OR REPLACE INTO dirty_issues (issue_id, marked_at) VALUES (?, ?)",
params![issue_id, Utc::now().to_rfc3339()],
)?;
Ok(())
}
}
impl Storage for SqliteStorage {
fn create_issue(&self, issue: &Issue) -> Result<()> {
self.with_conn(|conn| {
let exists: bool = conn.query_row(
"SELECT 1 FROM issues WHERE id = ?",
[&issue.id],
|_| Ok(true),
).optional()?.unwrap_or(false);
if exists {
return Err(StorageError::AlreadyExists(issue.id.clone()));
}
conn.execute(
"INSERT INTO issues (
id, content_hash, title, description, design, acceptance_criteria, notes,
status, priority, issue_type, assignee, owner, estimated_minutes,
created_at, created_by, updated_at, closed_at, close_reason,
due_at, defer_until, external_ref, source_system,
deleted_at, deleted_by, delete_reason,
compaction_level, compacted_at, compacted_at_commit, original_size,
agent_state, mol_type, hook_bead, role_bead, rig, last_activity,
pinned, is_template, ephemeral
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
issue.id,
issue.content_hash,
issue.title,
issue.description,
issue.design,
issue.acceptance_criteria,
issue.notes,
issue.status.as_str(),
issue.priority,
issue.issue_type.as_str(),
issue.assignee,
issue.owner,
issue.estimated_minutes,
issue.created_at.to_rfc3339(),
issue.created_by,
issue.updated_at.to_rfc3339(),
issue.closed_at.map(|t| t.to_rfc3339()),
issue.close_reason,
issue.due_at.map(|t| t.to_rfc3339()),
issue.defer_until.map(|t| t.to_rfc3339()),
issue.external_ref,
issue.source_system,
issue.deleted_at.map(|t| t.to_rfc3339()),
issue.deleted_by,
issue.delete_reason,
issue.compaction_level,
issue.compacted_at.map(|t| t.to_rfc3339()),
issue.compacted_at_commit,
issue.original_size,
issue.agent_state.map(|s| s.as_str()),
issue.mol_type.map(|m| m.as_str()),
issue.hook_bead,
issue.role_bead,
issue.rig,
issue.last_activity.map(|t| t.to_rfc3339()),
issue.pinned,
issue.is_template,
issue.ephemeral,
],
)?;
for label in &issue.labels {
conn.execute(
"INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
params![issue.id, label],
)?;
}
Self::record_event(conn, &issue.id, EventType::Created, &issue.created_by, None, None)?;
Self::mark_dirty_internal(conn, &issue.id)?;
Ok(())
})
}
fn create_issues(&self, issues: &[Issue]) -> Result<()> {
self.with_conn_mut(|conn| {
let tx = conn.transaction()?;
for issue in issues {
tx.execute(
"INSERT INTO issues (
id, content_hash, title, description, design, acceptance_criteria, notes,
status, priority, issue_type, assignee, owner, estimated_minutes,
created_at, created_by, updated_at, closed_at, close_reason,
due_at, defer_until, external_ref, source_system,
deleted_at, deleted_by, delete_reason,
compaction_level, compacted_at, compacted_at_commit, original_size,
agent_state, mol_type, hook_bead, role_bead, rig, last_activity,
pinned, is_template, ephemeral
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
issue.id,
issue.content_hash,
issue.title,
issue.description,
issue.design,
issue.acceptance_criteria,
issue.notes,
issue.status.as_str(),
issue.priority,
issue.issue_type.as_str(),
issue.assignee,
issue.owner,
issue.estimated_minutes,
issue.created_at.to_rfc3339(),
issue.created_by,
issue.updated_at.to_rfc3339(),
issue.closed_at.map(|t| t.to_rfc3339()),
issue.close_reason,
issue.due_at.map(|t| t.to_rfc3339()),
issue.defer_until.map(|t| t.to_rfc3339()),
issue.external_ref,
issue.source_system,
issue.deleted_at.map(|t| t.to_rfc3339()),
issue.deleted_by,
issue.delete_reason,
issue.compaction_level,
issue.compacted_at.map(|t| t.to_rfc3339()),
issue.compacted_at_commit,
issue.original_size,
issue.agent_state.map(|s| s.as_str()),
issue.mol_type.map(|m| m.as_str()),
issue.hook_bead,
issue.role_bead,
issue.rig,
issue.last_activity.map(|t| t.to_rfc3339()),
issue.pinned,
issue.is_template,
issue.ephemeral,
],
)?;
for label in &issue.labels {
tx.execute(
"INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
params![issue.id, label],
)?;
}
tx.execute(
"INSERT OR REPLACE INTO dirty_issues (issue_id, marked_at) VALUES (?, ?)",
params![issue.id, Utc::now().to_rfc3339()],
)?;
}
tx.commit()?;
Ok(())
})
}
fn get_issue(&self, id: &str) -> Result<Option<Issue>> {
self.with_conn(|conn| {
let issue = conn.query_row(
"SELECT * FROM issues WHERE id = ?",
[id],
|row| row_to_issue(row),
).optional()?;
if let Some(mut issue) = issue {
let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
let labels: Vec<String> = stmt.query_map([id], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
issue.labels = labels;
Ok(Some(issue))
} else {
Ok(None)
}
})
}
fn get_issue_by_external_ref(&self, external_ref: &str) -> Result<Option<Issue>> {
self.with_conn(|conn| {
let issue = conn.query_row(
"SELECT * FROM issues WHERE external_ref = ?",
[external_ref],
|row| row_to_issue(row),
).optional()?;
if let Some(mut issue) = issue {
let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
let labels: Vec<String> = stmt.query_map([&issue.id], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
issue.labels = labels;
Ok(Some(issue))
} else {
Ok(None)
}
})
}
fn update_issue(&self, issue: &Issue) -> Result<()> {
self.with_conn(|conn| {
let rows = conn.execute(
"UPDATE issues SET
content_hash = ?, title = ?, description = ?, design = ?,
acceptance_criteria = ?, notes = ?, status = ?, priority = ?,
issue_type = ?, assignee = ?, owner = ?, estimated_minutes = ?,
updated_at = ?, closed_at = ?, close_reason = ?, due_at = ?,
defer_until = ?, external_ref = ?, source_system = ?,
deleted_at = ?, deleted_by = ?, delete_reason = ?,
compaction_level = ?, compacted_at = ?, compacted_at_commit = ?,
original_size = ?, agent_state = ?, mol_type = ?,
hook_bead = ?, role_bead = ?, rig = ?, last_activity = ?,
pinned = ?, is_template = ?, ephemeral = ?
WHERE id = ?",
params![
issue.content_hash,
issue.title,
issue.description,
issue.design,
issue.acceptance_criteria,
issue.notes,
issue.status.as_str(),
issue.priority,
issue.issue_type.as_str(),
issue.assignee,
issue.owner,
issue.estimated_minutes,
issue.updated_at.to_rfc3339(),
issue.closed_at.map(|t| t.to_rfc3339()),
issue.close_reason,
issue.due_at.map(|t| t.to_rfc3339()),
issue.defer_until.map(|t| t.to_rfc3339()),
issue.external_ref,
issue.source_system,
issue.deleted_at.map(|t| t.to_rfc3339()),
issue.deleted_by,
issue.delete_reason,
issue.compaction_level,
issue.compacted_at.map(|t| t.to_rfc3339()),
issue.compacted_at_commit,
issue.original_size,
issue.agent_state.map(|s| s.as_str()),
issue.mol_type.map(|m| m.as_str()),
issue.hook_bead,
issue.role_bead,
issue.rig,
issue.last_activity.map(|t| t.to_rfc3339()),
issue.pinned,
issue.is_template,
issue.ephemeral,
issue.id,
],
)?;
if rows == 0 {
return Err(StorageError::NotFound(issue.id.clone()));
}
conn.execute("DELETE FROM labels WHERE issue_id = ?", [&issue.id])?;
for label in &issue.labels {
conn.execute(
"INSERT INTO labels (issue_id, label) VALUES (?, ?)",
params![issue.id, label],
)?;
}
Self::record_event(conn, &issue.id, EventType::Updated, &issue.created_by, None, None)?;
Self::mark_dirty_internal(conn, &issue.id)?;
Ok(())
})
}
fn close_issue(&self, id: &str, actor: &str, reason: Option<&str>) -> Result<()> {
self.with_conn(|conn| {
let now = Utc::now().to_rfc3339();
let rows = conn.execute(
"UPDATE issues SET status = 'closed', closed_at = ?, close_reason = ?, updated_at = ? WHERE id = ?",
params![now, reason, now, id],
)?;
if rows == 0 {
return Err(StorageError::NotFound(id.to_string()));
}
Self::record_event(conn, id, EventType::Closed, actor, None, reason)?;
Self::mark_dirty_internal(conn, id)?;
Ok(())
})
}
fn delete_issue(&self, id: &str, actor: &str, reason: Option<&str>) -> Result<()> {
self.with_conn(|conn| {
let now = Utc::now().to_rfc3339();
let rows = conn.execute(
"UPDATE issues SET status = 'tombstone', deleted_at = ?, deleted_by = ?, delete_reason = ?, updated_at = ? WHERE id = ?",
params![now, actor, reason, now, id],
)?;
if rows == 0 {
return Err(StorageError::NotFound(id.to_string()));
}
Self::record_event(conn, id, EventType::StatusChanged, actor, Some("open"), Some("tombstone"))?;
Self::mark_dirty_internal(conn, id)?;
Ok(())
})
}
fn search_issues(&self, filter: &IssueFilter) -> Result<Vec<Issue>> {
self.with_conn(|conn| {
let mut sql = String::from("SELECT * FROM issues WHERE 1=1");
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(ref status) = filter.status {
sql.push_str(" AND status = ?");
params.push(Box::new(status.as_str().to_string()));
}
if let Some(ref statuses) = filter.statuses {
let placeholders: Vec<_> = statuses.iter().map(|_| "?").collect();
sql.push_str(&format!(" AND status IN ({})", placeholders.join(",")));
for s in statuses {
params.push(Box::new(s.as_str().to_string()));
}
}
if !filter.include_tombstones {
sql.push_str(" AND status != 'tombstone'");
}
if let Some(ref issue_type) = filter.issue_type {
sql.push_str(" AND issue_type = ?");
params.push(Box::new(issue_type.as_str().to_string()));
}
if let Some(ref assignee) = filter.assignee {
sql.push_str(" AND assignee = ?");
params.push(Box::new(assignee.clone()));
}
if let Some(priority) = filter.priority {
sql.push_str(" AND priority = ?");
params.push(Box::new(priority));
}
if let Some(ref text) = filter.text_search {
sql.push_str(" AND (title LIKE ? OR description LIKE ? OR notes LIKE ?)");
let pattern = format!("%{}%", text);
params.push(Box::new(pattern.clone()));
params.push(Box::new(pattern.clone()));
params.push(Box::new(pattern));
}
sql.push_str(" ORDER BY ");
match filter.sort_by {
Some(crate::types::filter::SortField::Priority) => sql.push_str("priority"),
Some(crate::types::filter::SortField::Title) => sql.push_str("title"),
Some(crate::types::filter::SortField::UpdatedAt) => sql.push_str("updated_at"),
_ => sql.push_str("created_at"),
}
if filter.sort_desc {
sql.push_str(" DESC");
}
if let Some(limit) = filter.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
if let Some(offset) = filter.offset {
sql.push_str(&format!(" OFFSET {}", offset));
}
let mut stmt = conn.prepare(&sql)?;
let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let issues: Vec<Issue> = stmt.query_map(params_refs.as_slice(), |row| row_to_issue(row))?
.filter_map(|r| r.ok())
.collect();
Ok(issues)
})
}
fn add_dependency(&self, dep: &Dependency) -> Result<()> {
if dep.dep_type.check_cycles() && self.would_create_cycle(&dep.issue_id, &dep.depends_on_id, dep.dep_type)? {
return Err(StorageError::CycleDetected {
from: dep.issue_id.clone(),
to: dep.depends_on_id.clone(),
});
}
self.with_conn(|conn| {
conn.execute(
"INSERT OR REPLACE INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
VALUES (?, ?, ?, ?, ?, ?, ?)",
params![
dep.issue_id,
dep.depends_on_id,
dep.dep_type.as_str(),
dep.created_at.to_rfc3339(),
dep.created_by,
dep.metadata,
dep.thread_id,
],
)?;
Self::record_event(
conn,
&dep.issue_id,
EventType::DependencyAdded,
dep.created_by.as_deref().unwrap_or("system"),
None,
Some(&dep.depends_on_id),
)?;
Self::mark_dirty_internal(conn, &dep.issue_id)?;
Ok(())
})
}
fn remove_dependency(&self, issue_id: &str, depends_on_id: &str) -> Result<()> {
self.with_conn(|conn| {
let rows = conn.execute(
"DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?",
params![issue_id, depends_on_id],
)?;
if rows > 0 {
Self::record_event(conn, issue_id, EventType::DependencyRemoved, "system", Some(depends_on_id), None)?;
Self::mark_dirty_internal(conn, issue_id)?;
}
Ok(())
})
}
fn get_dependencies(&self, issue_id: &str) -> Result<Vec<Dependency>> {
self.with_conn(|conn| {
let mut stmt = conn.prepare(
"SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
FROM dependencies WHERE issue_id = ?"
)?;
let deps = stmt.query_map([issue_id], |row| row_to_dependency(row))?
.filter_map(|r| r.ok())
.collect();
Ok(deps)
})
}
fn get_dependents(&self, issue_id: &str) -> Result<Vec<Dependency>> {
self.with_conn(|conn| {
let mut stmt = conn.prepare(
"SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
FROM dependencies WHERE depends_on_id = ?"
)?;
let deps = stmt.query_map([issue_id], |row| row_to_dependency(row))?
.filter_map(|r| r.ok())
.collect();
Ok(deps)
})
}
fn would_create_cycle(&self, from_id: &str, to_id: &str, dep_type: DependencyType) -> Result<bool> {
if !dep_type.check_cycles() {
return Ok(false);
}
self.with_conn(|conn| {
let sql = r#"
WITH RECURSIVE reachable(id, depth) AS (
SELECT ?, 0
UNION
SELECT d.depends_on_id, r.depth + 1
FROM reachable r
JOIN dependencies d ON d.issue_id = r.id
WHERE r.depth < 100
AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
)
SELECT 1 FROM reachable WHERE id = ? LIMIT 1
"#;
let exists: bool = conn.query_row(sql, params![to_id, from_id], |_| Ok(true))
.optional()?
.unwrap_or(false);
Ok(exists)
})
}
fn get_ready_work(&self) -> Result<Vec<Issue>> {
self.with_conn(|conn| {
let sql = r#"
SELECT i.* FROM issues i
WHERE i.status = 'open'
AND i.deleted_at IS NULL
AND NOT EXISTS (
SELECT 1 FROM dependencies d
JOIN issues blocker ON blocker.id = d.depends_on_id
WHERE d.issue_id = i.id
AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
AND blocker.status NOT IN ('closed', 'tombstone')
)
ORDER BY i.priority, i.created_at
"#;
let mut stmt = conn.prepare(sql)?;
let issues = stmt.query_map([], |row| row_to_issue(row))?
.filter_map(|r| r.ok())
.collect();
Ok(issues)
})
}
fn get_blocked_issues(&self) -> Result<Vec<BlockedIssue>> {
self.with_conn(|conn| {
let sql = r#"
SELECT i.*, COUNT(d.depends_on_id) as blocking_count,
GROUP_CONCAT(d.depends_on_id) as blocking_ids
FROM issues i
JOIN dependencies d ON d.issue_id = i.id
JOIN issues blocker ON blocker.id = d.depends_on_id
WHERE i.status IN ('open', 'blocked')
AND i.deleted_at IS NULL
AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
AND blocker.status NOT IN ('closed', 'tombstone')
GROUP BY i.id
ORDER BY blocking_count DESC, i.priority, i.created_at
"#;
let mut stmt = conn.prepare(sql)?;
let blocked = stmt.query_map([], |row| {
let issue = row_to_issue(row)?;
let blocking_count: usize = row.get("blocking_count")?;
let blocking_ids_str: String = row.get("blocking_ids")?;
let blocking_ids: Vec<String> = blocking_ids_str
.split(',')
.map(|s| s.to_string())
.collect();
Ok(BlockedIssue {
issue,
blocking_count,
blocking_ids,
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(blocked)
})
}
fn is_blocked(&self, issue_id: &str) -> Result<bool> {
self.with_conn(|conn| {
let sql = r#"
SELECT 1 FROM dependencies d
JOIN issues blocker ON blocker.id = d.depends_on_id
WHERE d.issue_id = ?
AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
AND blocker.status NOT IN ('closed', 'tombstone')
LIMIT 1
"#;
let blocked: bool = conn.query_row(sql, [issue_id], |_| Ok(true))
.optional()?
.unwrap_or(false);
Ok(blocked)
})
}
fn add_label(&self, issue_id: &str, label: &str) -> Result<()> {
self.with_conn(|conn| {
conn.execute(
"INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)",
params![issue_id, label],
)?;
Self::record_event(conn, issue_id, EventType::LabelAdded, "system", None, Some(label))?;
Self::mark_dirty_internal(conn, issue_id)?;
Ok(())
})
}
fn remove_label(&self, issue_id: &str, label: &str) -> Result<()> {
self.with_conn(|conn| {
let rows = conn.execute(
"DELETE FROM labels WHERE issue_id = ? AND label = ?",
params![issue_id, label],
)?;
if rows > 0 {
Self::record_event(conn, issue_id, EventType::LabelRemoved, "system", Some(label), None)?;
Self::mark_dirty_internal(conn, issue_id)?;
}
Ok(())
})
}
fn get_labels(&self, issue_id: &str) -> Result<Vec<String>> {
self.with_conn(|conn| {
let mut stmt = conn.prepare("SELECT label FROM labels WHERE issue_id = ?")?;
let labels = stmt.query_map([issue_id], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
Ok(labels)
})
}
fn get_issues_by_label(&self, label: &str) -> Result<Vec<Issue>> {
self.with_conn(|conn| {
let sql = r#"
SELECT i.* FROM issues i
JOIN labels l ON l.issue_id = i.id
WHERE l.label = ?
AND i.status != 'tombstone'
ORDER BY i.created_at DESC
"#;
let mut stmt = conn.prepare(sql)?;
let issues = stmt.query_map([label], |row| row_to_issue(row))?
.filter_map(|r| r.ok())
.collect();
Ok(issues)
})
}
fn add_comment(&self, issue_id: &str, author: &str, text: &str) -> Result<i64> {
self.with_conn(|conn| {
conn.execute(
"INSERT INTO comments (issue_id, author, text, created_at) VALUES (?, ?, ?, ?)",
params![issue_id, author, text, Utc::now().to_rfc3339()],
)?;
let id = conn.last_insert_rowid();
Self::record_event(conn, issue_id, EventType::Commented, author, None, None)?;
Self::mark_dirty_internal(conn, issue_id)?;
Ok(id)
})
}
fn get_comments(&self, issue_id: &str) -> Result<Vec<Comment>> {
self.with_conn(|conn| {
let mut stmt = conn.prepare(
"SELECT id, issue_id, author, text, created_at FROM comments WHERE issue_id = ? ORDER BY created_at"
)?;
let comments = stmt.query_map([issue_id], |row| {
Ok(Comment {
id: row.get(0)?,
issue_id: row.get(1)?,
author: row.get(2)?,
text: row.get(3)?,
created_at: parse_datetime(&row.get::<_, String>(4)?),
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(comments)
})
}
fn get_events(&self, issue_id: &str) -> Result<Vec<Event>> {
self.with_conn(|conn| {
let mut stmt = conn.prepare(
"SELECT id, issue_id, event_type, actor, old_value, new_value, comment, created_at
FROM events WHERE issue_id = ? ORDER BY created_at"
)?;
let events = stmt.query_map([issue_id], |row| {
let event_type_str: String = row.get(2)?;
Ok(Event {
id: row.get(0)?,
issue_id: row.get(1)?,
event_type: event_type_str.parse().unwrap_or(EventType::Updated),
actor: row.get(3)?,
old_value: row.get(4)?,
new_value: row.get(5)?,
comment: row.get(6)?,
created_at: parse_datetime(&row.get::<_, String>(7)?),
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(events)
})
}
fn set_config(&self, key: &str, value: &str) -> Result<()> {
self.with_conn(|conn| {
conn.execute(
"INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
params![key, value],
)?;
Ok(())
})
}
fn get_config(&self, key: &str) -> Result<Option<String>> {
self.with_conn(|conn| {
conn.query_row(
"SELECT value FROM config WHERE key = ?",
[key],
|row| row.get(0),
).optional().map_err(|e| e.into())
})
}
fn delete_config(&self, key: &str) -> Result<()> {
self.with_conn(|conn| {
conn.execute("DELETE FROM config WHERE key = ?", [key])?;
Ok(())
})
}
fn get_all_config(&self) -> Result<HashMap<String, String>> {
self.with_conn(|conn| {
let mut stmt = conn.prepare("SELECT key, value FROM config")?;
let config = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?
.filter_map(|r| r.ok())
.collect();
Ok(config)
})
}
fn mark_dirty(&self, issue_id: &str) -> Result<()> {
self.with_conn(|conn| Self::mark_dirty_internal(conn, issue_id))
}
fn get_dirty_issues(&self) -> Result<Vec<String>> {
self.with_conn(|conn| {
let mut stmt = conn.prepare("SELECT issue_id FROM dirty_issues ORDER BY marked_at")?;
let ids = stmt.query_map([], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
Ok(ids)
})
}
fn clear_dirty(&self, issue_ids: &[String]) -> Result<()> {
if issue_ids.is_empty() {
return Ok(());
}
self.with_conn(|conn| {
let placeholders: Vec<_> = issue_ids.iter().map(|_| "?").collect();
let sql = format!(
"DELETE FROM dirty_issues WHERE issue_id IN ({})",
placeholders.join(",")
);
let params: Vec<&dyn rusqlite::ToSql> = issue_ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
conn.execute(&sql, params.as_slice())?;
Ok(())
})
}
fn get_statistics(&self) -> Result<Statistics> {
self.with_conn(|conn| {
let total_issues: usize = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE status != 'tombstone'",
[],
|row| row.get(0),
)?;
let open_issues: usize = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE status = 'open'",
[],
|row| row.get(0),
)?;
let in_progress_issues: usize = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE status = 'in_progress'",
[],
|row| row.get(0),
)?;
let blocked_issues: usize = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE status = 'blocked'",
[],
|row| row.get(0),
)?;
let closed_issues: usize = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE status = 'closed'",
[],
|row| row.get(0),
)?;
let total_dependencies: usize = conn.query_row(
"SELECT COUNT(*) FROM dependencies",
[],
|row| row.get(0),
)?;
let ready_issues: usize = conn.query_row(
r#"
SELECT COUNT(*) FROM issues i
WHERE i.status = 'open'
AND i.deleted_at IS NULL
AND NOT EXISTS (
SELECT 1 FROM dependencies d
JOIN issues blocker ON blocker.id = d.depends_on_id
WHERE d.issue_id = i.id
AND d.type IN ('blocks', 'parent_child', 'conditional_blocks', 'waits_for')
AND blocker.status NOT IN ('closed', 'tombstone')
)
"#,
[],
|row| row.get(0),
)?;
Ok(Statistics {
total_issues,
open_issues,
in_progress_issues,
blocked_issues,
closed_issues,
ready_issues,
total_dependencies,
})
})
}
fn next_child_counter(&self, parent_id: &str) -> Result<u32> {
self.with_conn(|conn| {
conn.execute(
"INSERT INTO child_counters (parent_id, counter) VALUES (?, 1)
ON CONFLICT(parent_id) DO UPDATE SET counter = counter + 1",
[parent_id],
)?;
let counter: u32 = conn.query_row(
"SELECT counter FROM child_counters WHERE parent_id = ?",
[parent_id],
|row| row.get(0),
)?;
Ok(counter)
})
}
fn transaction<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce() -> Result<T>,
{
self.with_conn_mut(|conn| {
let tx = conn.transaction()?;
let result = f();
match result {
Ok(v) => {
tx.commit()?;
Ok(v)
}
Err(e) => {
Err(e)
}
}
})
}
fn close(&self) -> Result<()> {
self.closed.store(true, Ordering::Release);
Ok(())
}
}
fn parse_datetime(s: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now())
}
fn row_to_issue(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
let status_str: String = row.get("status")?;
let issue_type_str: String = row.get("issue_type")?;
let agent_state_str: Option<String> = row.get("agent_state")?;
let mol_type_str: Option<String> = row.get("mol_type")?;
Ok(Issue {
id: row.get("id")?,
content_hash: row.get("content_hash")?,
title: row.get("title")?,
description: row.get("description")?,
design: row.get("design")?,
acceptance_criteria: row.get("acceptance_criteria")?,
notes: row.get("notes")?,
status: status_str.parse().unwrap_or_default(),
priority: row.get("priority")?,
issue_type: issue_type_str.parse().unwrap_or_default(),
assignee: row.get("assignee")?,
owner: row.get("owner")?,
estimated_minutes: row.get("estimated_minutes")?,
created_at: parse_datetime(&row.get::<_, String>("created_at")?),
created_by: row.get("created_by")?,
updated_at: parse_datetime(&row.get::<_, String>("updated_at")?),
closed_at: row.get::<_, Option<String>>("closed_at")?.map(|s| parse_datetime(&s)),
close_reason: row.get("close_reason")?,
due_at: row.get::<_, Option<String>>("due_at")?.map(|s| parse_datetime(&s)),
defer_until: row.get::<_, Option<String>>("defer_until")?.map(|s| parse_datetime(&s)),
external_ref: row.get("external_ref")?,
source_system: row.get("source_system")?,
labels: Vec::new(), deleted_at: row.get::<_, Option<String>>("deleted_at")?.map(|s| parse_datetime(&s)),
deleted_by: row.get("deleted_by")?,
delete_reason: row.get("delete_reason")?,
compaction_level: row.get("compaction_level")?,
compacted_at: row.get::<_, Option<String>>("compacted_at")?.map(|s| parse_datetime(&s)),
compacted_at_commit: row.get("compacted_at_commit")?,
original_size: row.get("original_size")?,
agent_state: agent_state_str.and_then(|s| s.parse().ok()),
mol_type: mol_type_str.and_then(|s| s.parse().ok()),
hook_bead: row.get("hook_bead")?,
role_bead: row.get("role_bead")?,
rig: row.get("rig")?,
last_activity: row.get::<_, Option<String>>("last_activity")?.map(|s| parse_datetime(&s)),
pinned: row.get::<_, i32>("pinned")? != 0,
is_template: row.get::<_, i32>("is_template")? != 0,
ephemeral: row.get::<_, i32>("ephemeral")? != 0,
})
}
fn row_to_dependency(row: &rusqlite::Row) -> rusqlite::Result<Dependency> {
let dep_type_str: String = row.get(2)?;
Ok(Dependency {
issue_id: row.get(0)?,
depends_on_id: row.get(1)?,
dep_type: dep_type_str.parse().unwrap_or_default(),
created_at: parse_datetime(&row.get::<_, String>(3)?),
created_by: row.get(4)?,
metadata: row.get(5)?,
thread_id: row.get(6)?,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_and_get_issue() {
let storage = SqliteStorage::in_memory().unwrap();
let issue = Issue::new("bd-a1b2", "Test task", "alice");
storage.create_issue(&issue).unwrap();
let retrieved = storage.get_issue("bd-a1b2").unwrap().unwrap();
assert_eq!(retrieved.id, "bd-a1b2");
assert_eq!(retrieved.title, "Test task");
assert_eq!(retrieved.created_by, "alice");
}
#[test]
fn test_dependency_cycle_detection() {
let storage = SqliteStorage::in_memory().unwrap();
storage.create_issue(&Issue::new("bd-1", "Task 1", "alice")).unwrap();
storage.create_issue(&Issue::new("bd-2", "Task 2", "alice")).unwrap();
storage.create_issue(&Issue::new("bd-3", "Task 3", "alice")).unwrap();
storage.add_dependency(&Dependency::blocks("bd-1", "bd-2")).unwrap();
storage.add_dependency(&Dependency::blocks("bd-2", "bd-3")).unwrap();
let result = storage.add_dependency(&Dependency::blocks("bd-3", "bd-1"));
assert!(matches!(result, Err(StorageError::CycleDetected { .. })));
}
#[test]
fn test_ready_work() {
let storage = SqliteStorage::in_memory().unwrap();
storage.create_issue(&Issue::new("bd-1", "Ready task", "alice")).unwrap();
storage.create_issue(&Issue::new("bd-2", "Blocked task", "alice")).unwrap();
storage.create_issue(&Issue::new("bd-3", "Blocker", "alice")).unwrap();
storage.add_dependency(&Dependency::blocks("bd-2", "bd-3")).unwrap();
let ready = storage.get_ready_work().unwrap();
assert_eq!(ready.len(), 2);
storage.close_issue("bd-3", "alice", None).unwrap();
let ready = storage.get_ready_work().unwrap();
assert_eq!(ready.len(), 2); }
#[test]
fn test_labels() {
let storage = SqliteStorage::in_memory().unwrap();
storage.create_issue(&Issue::new("bd-1", "Task", "alice")).unwrap();
storage.add_label("bd-1", "bug").unwrap();
storage.add_label("bd-1", "urgent").unwrap();
let labels = storage.get_labels("bd-1").unwrap();
assert!(labels.contains(&"bug".to_string()));
assert!(labels.contains(&"urgent".to_string()));
let issues = storage.get_issues_by_label("bug").unwrap();
assert_eq!(issues.len(), 1);
assert_eq!(issues[0].id, "bd-1");
}
}