1use rusqlite::Connection;
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::error::ServerError;
12
13#[derive(Clone)]
15pub struct Database {
16 conn: Arc<Mutex<Connection>>,
17}
18
19impl Database {
20 fn ignore_duplicate_column(
21 result: Result<usize, rusqlite::Error>,
22 ) -> Result<(), rusqlite::Error> {
23 match result {
24 Ok(_) => Ok(()),
25 Err(error)
26 if error
27 .to_string()
28 .to_ascii_lowercase()
29 .contains("duplicate column name") =>
30 {
31 Ok(())
32 }
33 Err(error) => Err(error),
34 }
35 }
36
37 pub fn open(db_path: &str) -> Result<Self, ServerError> {
39 let path = Path::new(db_path);
40 if let Some(parent) = path.parent() {
41 std::fs::create_dir_all(parent).ok();
42 }
43
44 let conn = Connection::open(db_path)
45 .map_err(|e| ServerError::Database(format!("Failed to open database: {}", e)))?;
46
47 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
48 .map_err(|e| ServerError::Database(format!("Failed to set pragmas: {}", e)))?;
49
50 let db = Self {
51 conn: Arc::new(Mutex::new(conn)),
52 };
53
54 db.initialize_tables()?;
55
56 tracing::info!("SQLite database opened at: {}", db_path);
57 Ok(db)
58 }
59
60 pub fn open_in_memory() -> Result<Self, ServerError> {
62 let conn = Connection::open_in_memory()
63 .map_err(|e| ServerError::Database(format!("Failed to open in-memory db: {}", e)))?;
64
65 conn.execute_batch("PRAGMA foreign_keys=ON;")
66 .map_err(|e| ServerError::Database(format!("Failed to set pragmas: {}", e)))?;
67
68 let db = Self {
69 conn: Arc::new(Mutex::new(conn)),
70 };
71
72 db.initialize_tables()?;
73 Ok(db)
74 }
75
76 pub fn with_conn<F, T>(&self, f: F) -> Result<T, ServerError>
79 where
80 F: FnOnce(&Connection) -> Result<T, rusqlite::Error>,
81 {
82 let conn = self
83 .conn
84 .lock()
85 .map_err(|e| ServerError::Database(format!("Lock poisoned: {}", e)))?;
86 f(&conn).map_err(|e| ServerError::Database(e.to_string()))
87 }
88
89 pub async fn with_conn_async<F, T>(&self, f: F) -> Result<T, ServerError>
91 where
92 F: FnOnce(&Connection) -> Result<T, rusqlite::Error> + Send + 'static,
93 T: Send + 'static,
94 {
95 let db = self.clone();
96 tokio::task::spawn_blocking(move || db.with_conn(f))
97 .await
98 .map_err(|e| ServerError::Database(format!("Task join error: {}", e)))?
99 }
100
101 fn initialize_tables(&self) -> Result<(), ServerError> {
103 self.with_conn(|conn| {
104 conn.execute_batch(
105 "
106 CREATE TABLE IF NOT EXISTS workspaces (
107 id TEXT PRIMARY KEY,
108 title TEXT NOT NULL,
109 status TEXT NOT NULL DEFAULT 'active',
110 metadata TEXT NOT NULL DEFAULT '{}',
111 created_at INTEGER NOT NULL,
112 updated_at INTEGER NOT NULL
113 );
114
115 CREATE TABLE IF NOT EXISTS codebases (
116 id TEXT PRIMARY KEY,
117 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
118 repo_path TEXT NOT NULL,
119 branch TEXT,
120 label TEXT,
121 is_default INTEGER NOT NULL DEFAULT 0,
122 source_type TEXT,
123 source_url TEXT,
124 created_at INTEGER NOT NULL,
125 updated_at INTEGER NOT NULL
126 );
127 CREATE INDEX IF NOT EXISTS idx_codebases_workspace ON codebases(workspace_id);
128
129 CREATE TABLE IF NOT EXISTS acp_sessions (
130 id TEXT PRIMARY KEY,
131 name TEXT,
132 cwd TEXT NOT NULL,
133 branch TEXT,
134 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
135 routa_agent_id TEXT,
136 provider TEXT,
137 role TEXT,
138 mode_id TEXT,
139 first_prompt_sent INTEGER DEFAULT 0,
140 message_history TEXT NOT NULL DEFAULT '[]',
141 created_at INTEGER NOT NULL,
142 updated_at INTEGER NOT NULL
143 );
144 CREATE INDEX IF NOT EXISTS idx_acp_sessions_workspace ON acp_sessions(workspace_id);
145
146 CREATE TABLE IF NOT EXISTS skills (
147 id TEXT PRIMARY KEY,
148 name TEXT NOT NULL,
149 description TEXT NOT NULL DEFAULT '',
150 source TEXT NOT NULL,
151 catalog_type TEXT NOT NULL DEFAULT 'skillssh',
152 files TEXT NOT NULL DEFAULT '[]',
153 license TEXT,
154 metadata TEXT NOT NULL DEFAULT '{}',
155 installs INTEGER NOT NULL DEFAULT 0,
156 created_at INTEGER NOT NULL,
157 updated_at INTEGER NOT NULL
158 );
159
160 CREATE TABLE IF NOT EXISTS workspace_skills (
161 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
162 skill_id TEXT NOT NULL REFERENCES skills(id) ON DELETE CASCADE,
163 installed_at INTEGER NOT NULL,
164 PRIMARY KEY (workspace_id, skill_id)
165 );
166
167 CREATE TABLE IF NOT EXISTS agents (
168 id TEXT PRIMARY KEY,
169 name TEXT NOT NULL,
170 role TEXT NOT NULL,
171 model_tier TEXT NOT NULL DEFAULT 'SMART',
172 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
173 parent_id TEXT,
174 status TEXT NOT NULL DEFAULT 'PENDING',
175 metadata TEXT NOT NULL DEFAULT '{}',
176 created_at INTEGER NOT NULL,
177 updated_at INTEGER NOT NULL
178 );
179
180 CREATE TABLE IF NOT EXISTS tasks (
181 id TEXT PRIMARY KEY,
182 title TEXT NOT NULL,
183 objective TEXT NOT NULL,
184 comment TEXT,
185 scope TEXT,
186 acceptance_criteria TEXT,
187 verification_commands TEXT,
188 test_cases TEXT,
189 assigned_to TEXT,
190 status TEXT NOT NULL DEFAULT 'PENDING',
191 board_id TEXT,
192 column_id TEXT,
193 position INTEGER NOT NULL DEFAULT 0,
194 priority TEXT,
195 labels TEXT NOT NULL DEFAULT '[]',
196 assignee TEXT,
197 assigned_provider TEXT,
198 assigned_role TEXT,
199 assigned_specialist_id TEXT,
200 assigned_specialist_name TEXT,
201 trigger_session_id TEXT,
202 github_id TEXT,
203 github_number INTEGER,
204 github_url TEXT,
205 github_repo TEXT,
206 github_state TEXT,
207 github_synced_at INTEGER,
208 last_sync_error TEXT,
209 dependencies TEXT NOT NULL DEFAULT '[]',
210 parallel_group TEXT,
211 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
212 session_id TEXT,
213 session_ids TEXT NOT NULL DEFAULT '[]',
214 lane_sessions TEXT NOT NULL DEFAULT '[]',
215 lane_handoffs TEXT NOT NULL DEFAULT '[]',
216 completion_summary TEXT,
217 verification_verdict TEXT,
218 verification_report TEXT,
219 codebase_ids TEXT NOT NULL DEFAULT '[]',
220 worktree_id TEXT,
221 version INTEGER NOT NULL DEFAULT 1,
222 created_at INTEGER NOT NULL,
223 updated_at INTEGER NOT NULL
224 );
225 CREATE TABLE IF NOT EXISTS artifacts (
226 id TEXT PRIMARY KEY,
227 type TEXT NOT NULL,
228 task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
229 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
230 provided_by_agent_id TEXT,
231 requested_by_agent_id TEXT,
232 request_id TEXT,
233 content TEXT,
234 context TEXT,
235 status TEXT NOT NULL DEFAULT 'provided',
236 expires_at INTEGER,
237 metadata TEXT,
238 created_at INTEGER NOT NULL,
239 updated_at INTEGER NOT NULL
240 );
241 CREATE TABLE IF NOT EXISTS kanban_boards (
242 id TEXT PRIMARY KEY,
243 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
244 name TEXT NOT NULL,
245 is_default INTEGER NOT NULL DEFAULT 0,
246 columns TEXT NOT NULL DEFAULT '[]',
247 created_at INTEGER NOT NULL,
248 updated_at INTEGER NOT NULL
249 );
250 CREATE TABLE IF NOT EXISTS notes (
251 id TEXT NOT NULL,
252 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
253 session_id TEXT,
254 title TEXT NOT NULL,
255 content TEXT NOT NULL DEFAULT '',
256 type TEXT NOT NULL DEFAULT 'general',
257 task_status TEXT,
258 assigned_agent_ids TEXT,
259 parent_note_id TEXT,
260 linked_task_id TEXT,
261 custom_metadata TEXT,
262 created_at INTEGER NOT NULL,
263 updated_at INTEGER NOT NULL,
264 PRIMARY KEY (workspace_id, id)
265 );
266
267 CREATE TABLE IF NOT EXISTS messages (
268 id TEXT PRIMARY KEY,
269 agent_id TEXT NOT NULL,
270 role TEXT NOT NULL,
271 content TEXT NOT NULL,
272 timestamp INTEGER NOT NULL,
273 tool_name TEXT,
274 tool_args TEXT,
275 turn INTEGER
276 );
277
278 CREATE TABLE IF NOT EXISTS event_subscriptions (
279 id TEXT PRIMARY KEY,
280 agent_id TEXT NOT NULL,
281 agent_name TEXT NOT NULL,
282 event_types TEXT NOT NULL,
283 exclude_self INTEGER NOT NULL DEFAULT 1,
284 one_shot INTEGER NOT NULL DEFAULT 0,
285 wait_group_id TEXT,
286 priority INTEGER NOT NULL DEFAULT 0,
287 created_at INTEGER NOT NULL
288 );
289
290 CREATE TABLE IF NOT EXISTS pending_events (
291 id TEXT PRIMARY KEY,
292 agent_id TEXT NOT NULL,
293 event_type TEXT NOT NULL,
294 source_agent_id TEXT NOT NULL,
295 workspace_id TEXT NOT NULL,
296 data TEXT NOT NULL DEFAULT '{}',
297 timestamp INTEGER NOT NULL
298 );
299
300 CREATE INDEX IF NOT EXISTS idx_agents_workspace ON agents(workspace_id);
301 CREATE INDEX IF NOT EXISTS idx_tasks_workspace ON tasks(workspace_id);
302 CREATE INDEX IF NOT EXISTS idx_artifacts_task ON artifacts(task_id);
303 CREATE INDEX IF NOT EXISTS idx_artifacts_workspace ON artifacts(workspace_id);
304 CREATE INDEX IF NOT EXISTS idx_kanban_boards_workspace ON kanban_boards(workspace_id);
305 CREATE UNIQUE INDEX IF NOT EXISTS uq_kanban_boards_default_workspace ON kanban_boards(workspace_id) WHERE is_default = 1;
306 CREATE INDEX IF NOT EXISTS idx_notes_workspace ON notes(workspace_id);
307 CREATE INDEX IF NOT EXISTS idx_messages_agent ON messages(agent_id);
308
309 CREATE TABLE IF NOT EXISTS schedules (
310 id TEXT PRIMARY KEY,
311 name TEXT NOT NULL,
312 cron_expr TEXT NOT NULL,
313 task_prompt TEXT NOT NULL,
314 agent_id TEXT NOT NULL,
315 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
316 enabled INTEGER NOT NULL DEFAULT 1,
317 last_run_at INTEGER,
318 next_run_at INTEGER,
319 last_task_id TEXT,
320 prompt_template TEXT,
321 created_at INTEGER NOT NULL,
322 updated_at INTEGER NOT NULL
323 );
324 CREATE INDEX IF NOT EXISTS idx_schedules_workspace ON schedules(workspace_id);
325 CREATE INDEX IF NOT EXISTS idx_schedules_next_run ON schedules(next_run_at) WHERE enabled = 1;
326
327 CREATE TABLE IF NOT EXISTS worktrees (
328 id TEXT PRIMARY KEY,
329 codebase_id TEXT NOT NULL REFERENCES codebases(id) ON DELETE CASCADE,
330 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
331 worktree_path TEXT NOT NULL,
332 branch TEXT NOT NULL,
333 base_branch TEXT NOT NULL,
334 status TEXT NOT NULL DEFAULT 'creating',
335 session_id TEXT,
336 label TEXT,
337 error_message TEXT,
338 created_at INTEGER NOT NULL,
339 updated_at INTEGER NOT NULL
340 );
341 CREATE INDEX IF NOT EXISTS idx_worktrees_workspace ON worktrees(workspace_id);
342 CREATE INDEX IF NOT EXISTS idx_worktrees_codebase ON worktrees(codebase_id);
343 CREATE UNIQUE INDEX IF NOT EXISTS uq_worktrees_codebase_branch
344 ON worktrees(codebase_id, branch);
345 CREATE UNIQUE INDEX IF NOT EXISTS uq_worktrees_path
346 ON worktrees(worktree_path);
347 "
348 )
349 })?;
350 self.run_migrations()
351 }
352
353 fn run_migrations(&self) -> Result<(), ServerError> {
355 self.with_conn(|conn| {
356 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN comment TEXT", []))?;
358 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN session_id TEXT", []))?;
359 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN board_id TEXT", []))?;
360 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN column_id TEXT", []))?;
361 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN position INTEGER NOT NULL DEFAULT 0", []))?;
362 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN priority TEXT", []))?;
363 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN labels TEXT NOT NULL DEFAULT '[]'", []))?;
364 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN assignee TEXT", []))?;
365 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN assigned_provider TEXT", []))?;
366 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN assigned_role TEXT", []))?;
367 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN assigned_specialist_id TEXT", []))?;
368 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN assigned_specialist_name TEXT", []))?;
369 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN trigger_session_id TEXT", []))?;
370 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN github_id TEXT", []))?;
371 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN github_number INTEGER", []))?;
372 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN github_url TEXT", []))?;
373 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN github_repo TEXT", []))?;
374 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN github_state TEXT", []))?;
375 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN github_synced_at INTEGER", []))?;
376 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN last_sync_error TEXT", []))?;
377 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN test_cases TEXT", []))?;
378 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN codebase_ids TEXT NOT NULL DEFAULT '[]'", []))?;
379 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN worktree_id TEXT", []))?;
380 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN session_ids TEXT NOT NULL DEFAULT '[]'", []))?;
381 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN lane_sessions TEXT NOT NULL DEFAULT '[]'", []))?;
382 Self::ignore_duplicate_column(conn.execute("ALTER TABLE tasks ADD COLUMN lane_handoffs TEXT NOT NULL DEFAULT '[]'", []))?;
383 Self::ignore_duplicate_column(conn.execute("ALTER TABLE notes ADD COLUMN session_id TEXT", []))?;
385 Self::ignore_duplicate_column(conn.execute("ALTER TABLE acp_sessions ADD COLUMN branch TEXT", []))?;
386 Self::ignore_duplicate_column(conn.execute("ALTER TABLE acp_sessions ADD COLUMN parent_session_id TEXT", []))?;
388 Self::ignore_duplicate_column(conn.execute("ALTER TABLE codebases ADD COLUMN source_type TEXT", []))?;
389 Self::ignore_duplicate_column(conn.execute("ALTER TABLE codebases ADD COLUMN source_url TEXT", []))?;
390 conn.execute_batch(
391 "CREATE TABLE IF NOT EXISTS kanban_boards (
392 id TEXT PRIMARY KEY,
393 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
394 name TEXT NOT NULL,
395 is_default INTEGER NOT NULL DEFAULT 0,
396 columns TEXT NOT NULL DEFAULT '[]',
397 created_at INTEGER NOT NULL,
398 updated_at INTEGER NOT NULL
399 );
400 CREATE INDEX IF NOT EXISTS idx_kanban_boards_workspace ON kanban_boards(workspace_id);
401 CREATE UNIQUE INDEX IF NOT EXISTS uq_kanban_boards_default_workspace ON kanban_boards(workspace_id) WHERE is_default = 1;"
402 )?;
403 conn.execute_batch(
404 "CREATE TABLE IF NOT EXISTS artifacts (
405 id TEXT PRIMARY KEY,
406 type TEXT NOT NULL,
407 task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
408 workspace_id TEXT NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
409 provided_by_agent_id TEXT,
410 requested_by_agent_id TEXT,
411 request_id TEXT,
412 content TEXT,
413 context TEXT,
414 status TEXT NOT NULL DEFAULT 'provided',
415 expires_at INTEGER,
416 metadata TEXT,
417 created_at INTEGER NOT NULL,
418 updated_at INTEGER NOT NULL
419 );
420 CREATE INDEX IF NOT EXISTS idx_artifacts_task ON artifacts(task_id);
421 CREATE INDEX IF NOT EXISTS idx_artifacts_workspace ON artifacts(workspace_id);"
422 )?;
423 Self::ignore_duplicate_column(conn.execute("ALTER TABLE kanban_boards ADD COLUMN columns TEXT NOT NULL DEFAULT '[]'", []))?;
424 let _ = conn.execute("UPDATE kanban_boards SET columns = columns_json WHERE (columns IS NULL OR columns = '[]') AND columns_json IS NOT NULL", []);
425 conn.execute_batch(
427 "CREATE INDEX IF NOT EXISTS idx_tasks_session ON tasks(session_id);
428 CREATE INDEX IF NOT EXISTS idx_notes_session ON notes(session_id);
429 CREATE INDEX IF NOT EXISTS idx_acp_sessions_parent ON acp_sessions(parent_session_id);"
430 )
431 })
432 }
433}