1use std::path::Path;
2use std::sync::Mutex;
3use async_trait::async_trait;
4
5use boarddown_core::{Board, BoardId, Task, TaskId, Query, Changeset, Version, Storage, Error as CoreError, SearchResult};
6use boarddown_schema::{Status, Column, Operation, TaskOp, Metadata, TaskBuilder as SchemaTaskBuilder};
7use crate::fts::FullTextSearch;
8
9#[derive(Debug)]
10pub struct SqliteStorage {
11 path: std::path::PathBuf,
12 conn: Mutex<rusqlite::Connection>,
13 fts: FullTextSearch,
14}
15
16impl SqliteStorage {
17 pub async fn new(path: impl AsRef<Path>) -> Result<Self, CoreError> {
18 let path = path.as_ref().to_path_buf();
19 let conn = rusqlite::Connection::open(&path).map_err(|e| CoreError::Database(e.to_string()))?;
20
21 let fts = FullTextSearch::new(true);
22
23 Ok(Self {
24 path,
25 conn: Mutex::new(conn),
26 fts,
27 })
28 }
29
30 pub async fn init(&self) -> Result<(), CoreError> {
31 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
32
33 conn.execute(
34 "CREATE TABLE IF NOT EXISTS boards (
35 id TEXT PRIMARY KEY,
36 title TEXT NOT NULL,
37 version INTEGER NOT NULL DEFAULT 0,
38 created_at TEXT NOT NULL,
39 updated_at TEXT NOT NULL
40 )",
41 [],
42 ).map_err(|e| CoreError::Database(e.to_string()))?;
43
44 conn.execute(
45 "CREATE TABLE IF NOT EXISTS columns (
46 id INTEGER PRIMARY KEY AUTOINCREMENT,
47 board_id TEXT NOT NULL,
48 name TEXT NOT NULL,
49 position INTEGER NOT NULL,
50 FOREIGN KEY (board_id) REFERENCES boards(id) ON DELETE CASCADE
51 )",
52 [],
53 ).map_err(|e| CoreError::Database(e.to_string()))?;
54
55 conn.execute(
56 "CREATE TABLE IF NOT EXISTS tasks (
57 id TEXT NOT NULL,
58 board_id TEXT NOT NULL,
59 title TEXT NOT NULL,
60 status TEXT NOT NULL,
61 column_name TEXT NOT NULL,
62 position INTEGER NOT NULL DEFAULT 0,
63 dependencies TEXT NOT NULL DEFAULT '[]',
64 metadata TEXT NOT NULL DEFAULT '{}',
65 created_at TEXT NOT NULL,
66 updated_at TEXT NOT NULL,
67 PRIMARY KEY (id, board_id),
68 FOREIGN KEY (board_id) REFERENCES boards(id) ON DELETE CASCADE
69 )",
70 [],
71 ).map_err(|e| CoreError::Database(e.to_string()))?;
72
73 conn.execute(
74 "CREATE TABLE IF NOT EXISTS operations (
75 id INTEGER PRIMARY KEY AUTOINCREMENT,
76 board_id TEXT NOT NULL,
77 task_id TEXT NOT NULL,
78 operation_type TEXT NOT NULL,
79 operation_data TEXT NOT NULL,
80 timestamp INTEGER NOT NULL,
81 client_id INTEGER NOT NULL,
82 version INTEGER NOT NULL,
83 FOREIGN KEY (board_id) REFERENCES boards(id) ON DELETE CASCADE
84 )",
85 [],
86 ).map_err(|e| CoreError::Database(e.to_string()))?;
87
88 conn.execute(
89 "CREATE INDEX IF NOT EXISTS idx_tasks_board_id ON tasks(board_id)",
90 [],
91 ).map_err(|e| CoreError::Database(e.to_string()))?;
92
93 conn.execute(
94 "CREATE INDEX IF NOT EXISTS idx_operations_board_version ON operations(board_id, version)",
95 [],
96 ).map_err(|e| CoreError::Database(e.to_string()))?;
97
98 self.fts.init(&conn)?;
99
100 Ok(())
101 }
102
103 pub async fn run_migrations(&self) -> Result<(), CoreError> {
104 Ok(())
105 }
106
107 pub async fn enable_wal(&self) -> Result<(), CoreError> {
108 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
109 conn.execute_batch("PRAGMA journal_mode=WAL;").map_err(|e| CoreError::Database(e.to_string()))?;
110 Ok(())
111 }
112
113 fn board_from_rows(&self, board_id: &str, title: &str) -> Result<Board, CoreError> {
114 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
115
116 let mut board = Board::new(board_id, title);
117
118 let mut stmt = conn.prepare(
119 "SELECT name, position FROM columns WHERE board_id = ? ORDER BY position"
120 ).map_err(|e| CoreError::Database(e.to_string()))?;
121
122 let columns: Vec<(String, i32)> = stmt
123 .query_map([board_id], |row| Ok((row.get(0)?, row.get(1)?)))
124 .map_err(|e| CoreError::Database(e.to_string()))?
125 .filter_map(|r| r.ok())
126 .collect();
127
128 for (name, _pos) in columns {
129 board.add_column(Column::new(&name));
130 }
131
132 Ok(board)
133 }
134
135 fn load_tasks_for_board(&self, board_id: &str) -> Result<std::collections::HashMap<TaskId, Task>, CoreError> {
136 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
137
138 let mut stmt = conn.prepare(
139 "SELECT id, title, status, column_name, position, dependencies, metadata, created_at, updated_at
140 FROM tasks WHERE board_id = ?"
141 ).map_err(|e| CoreError::Database(e.to_string()))?;
142
143 let rows = stmt.query_map([board_id], |row| {
144 let id: String = row.get(0)?;
145 let title: String = row.get(1)?;
146 let status: String = row.get(2)?;
147 let column_name: String = row.get(3)?;
148 let _position: i32 = row.get(4)?;
149 let dependencies_json: String = row.get(5)?;
150 let metadata_json: String = row.get(6)?;
151
152 Ok((id, title, status, column_name, dependencies_json, metadata_json))
153 }).map_err(|e| CoreError::Database(e.to_string()))?;
154
155 let mut tasks = std::collections::HashMap::new();
156
157 for row in rows {
158 let (id, title, status_str, column_name, deps_json, meta_json) = row.map_err(|e| CoreError::Database(e.to_string()))?;
159
160 let status = match status_str.as_str() {
161 "todo" => Status::Todo,
162 "inprogress" => Status::InProgress,
163 "ready" => Status::Ready,
164 "done" => Status::Done,
165 "blocked" => Status::Blocked,
166 "urgent" => Status::Urgent,
167 _ => Status::Todo,
168 };
169
170 let dependencies: Vec<TaskId> = serde_json::from_str(&deps_json).unwrap_or_default();
171 let metadata: Metadata = serde_json::from_str(&meta_json).unwrap_or_default();
172
173 let task = SchemaTaskBuilder::default()
174 .id(TaskId::parse(&id).unwrap_or_else(|_| TaskId::new("TASK", 0)))
175 .title(&title)
176 .status(status)
177 .column(&column_name)
178 .metadata(metadata)
179 .with_dependencies(dependencies)
180 .build()
181 .unwrap();
182
183 tasks.insert(task.id.clone(), task);
184 }
185
186 Ok(tasks)
187 }
188}
189
190#[async_trait]
191impl Storage for SqliteStorage {
192 async fn load_board(&self, id: &BoardId) -> Result<Board, CoreError> {
193 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
194
195 let mut stmt = conn.prepare("SELECT id, title FROM boards WHERE id = ?").map_err(|e| CoreError::Database(e.to_string()))?;
196 let board_info: Option<(String, String)> = stmt
197 .query_row([id.as_ref()], |row| Ok((row.get(0)?, row.get(1)?)))
198 .ok();
199
200 if let Some((board_id, title)) = board_info {
201 drop(stmt);
202 drop(conn);
203
204 let mut board = self.board_from_rows(&board_id, &title)?;
205 let tasks = self.load_tasks_for_board(&board_id)?;
206 board.tasks = tasks;
207 Ok(board)
208 } else {
209 Err(CoreError::BoardNotFound(id.to_string()))
210 }
211 }
212
213 async fn save_board(&self, board: &Board) -> Result<(), CoreError> {
214 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
215
216 let now = chrono::Utc::now().to_rfc3339();
217
218 conn.execute(
219 "INSERT OR REPLACE INTO boards (id, title, version, created_at, updated_at)
220 VALUES (?1, ?2, 0, ?3, ?3)",
221 [board.id.as_ref(), &board.title, &now],
222 ).map_err(|e| CoreError::Database(e.to_string()))?;
223
224 conn.execute(
225 "DELETE FROM columns WHERE board_id = ?",
226 [board.id.as_ref()],
227 ).map_err(|e| CoreError::Database(e.to_string()))?;
228
229 for (i, col) in board.columns.iter().enumerate() {
230 conn.execute(
231 "INSERT INTO columns (board_id, name, position) VALUES (?, ?, ?)",
232 rusqlite::params![board.id.as_ref(), col.name, i as i32],
233 ).map_err(|e| CoreError::Database(e.to_string()))?;
234 }
235
236 conn.execute(
237 "DELETE FROM tasks WHERE board_id = ?",
238 [board.id.as_ref()],
239 ).map_err(|e| CoreError::Database(e.to_string()))?;
240
241 for (i, task) in board.tasks.values().enumerate() {
242 let status_str = match task.status {
243 Status::Todo => "todo",
244 Status::InProgress => "inprogress",
245 Status::Ready => "ready",
246 Status::Done => "done",
247 Status::Blocked => "blocked",
248 Status::Urgent => "urgent",
249 };
250
251 let deps_json = serde_json::to_string(&task.dependencies).unwrap_or_else(|_| "[]".to_string());
252 let meta_json = serde_json::to_string(&task.metadata).unwrap_or_else(|_| "{}".to_string());
253
254 conn.execute(
255 "INSERT INTO tasks (id, board_id, title, status, column_name, position, dependencies, metadata, created_at, updated_at)
256 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)",
257 rusqlite::params![
258 task.id.to_string(),
259 board.id.as_ref(),
260 task.title,
261 status_str,
262 task.column.to_string(),
263 i as i32,
264 deps_json,
265 meta_json,
266 now
267 ],
268 ).map_err(|e| CoreError::Database(e.to_string()))?;
269 }
270
271 Ok(())
272 }
273
274 async fn delete_board(&self, id: &BoardId) -> Result<(), CoreError> {
275 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
276
277 conn.execute("DELETE FROM boards WHERE id = ?", [id.as_ref()]).map_err(|e| CoreError::Database(e.to_string()))?;
278 Ok(())
279 }
280
281 async fn list_boards(&self) -> Result<Vec<BoardId>, CoreError> {
282 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
283
284 let mut stmt = conn.prepare("SELECT id FROM boards").map_err(|e| CoreError::Database(e.to_string()))?;
285 let rows = stmt.query_map([], |row| row.get::<_, String>(0)).map_err(|e| CoreError::Database(e.to_string()))?;
286
287 let mut ids = Vec::new();
288 for row in rows {
289 if let Ok(id) = row {
290 ids.push(BoardId::from(id));
291 }
292 }
293
294 Ok(ids)
295 }
296
297 async fn load_task(&self, board_id: &BoardId, task_id: &TaskId) -> Result<Task, CoreError> {
298 let board = self.load_board(board_id).await?;
299 board.tasks
300 .get(task_id)
301 .cloned()
302 .ok_or_else(|| CoreError::TaskNotFound(task_id.to_string()))
303 }
304
305 async fn save_task(&self, board_id: &BoardId, task: &Task) -> Result<(), CoreError> {
306 let mut board = self.load_board(board_id).await?;
307 board.tasks.insert(task.id.clone(), task.clone());
308 self.save_board(&board).await?;
309
310 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
311 self.fts.index_task(
312 &conn,
313 board_id,
314 &task.id,
315 &task.title,
316 "",
317 &task.metadata.tags,
318 )?;
319
320 Ok(())
321 }
322
323 async fn delete_task(&self, board_id: &BoardId, task_id: &TaskId) -> Result<(), CoreError> {
324 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
325
326 conn.execute(
327 "DELETE FROM tasks WHERE board_id = ? AND id = ?",
328 rusqlite::params![board_id.as_ref(), task_id.to_string()],
329 ).map_err(|e| CoreError::Database(e.to_string()))?;
330
331 self.fts.remove_task(&conn, board_id, task_id)?;
332
333 Ok(())
334 }
335
336 async fn query(&self, board_id: &BoardId, query: Query) -> Result<Vec<Task>, CoreError> {
337 let board = self.load_board(board_id).await?;
338 Ok(boarddown_core::execute_query(&board, query))
339 }
340
341 async fn search(&self, query: &str) -> Result<Vec<SearchResult>, CoreError> {
342 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
343 self.fts.search(&conn, query)
344 }
345
346 async fn search_in_board(&self, board_id: &BoardId, query: &str) -> Result<Vec<SearchResult>, CoreError> {
347 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
348 self.fts.search_in_board(&conn, board_id, query)
349 }
350
351 async fn get_changeset(&self, board_id: &BoardId, since: Version) -> Result<Changeset, CoreError> {
352 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
353
354 let mut stmt = conn.prepare(
355 "SELECT task_id, operation_type, operation_data, timestamp, client_id
356 FROM operations WHERE board_id = ? AND version > ? ORDER BY version"
357 ).map_err(|e| CoreError::Database(e.to_string()))?;
358
359 let rows = stmt.query_map(rusqlite::params![board_id.as_ref(), since.0], |row| {
360 Ok((
361 row.get::<_, String>(0)?,
362 row.get::<_, String>(1)?,
363 row.get::<_, String>(2)?,
364 row.get::<_, i64>(3)?,
365 row.get::<_, i64>(4)?,
366 ))
367 }).map_err(|e| CoreError::Database(e.to_string()))?;
368
369 let mut changeset = Changeset::new(board_id.clone(), since);
370
371 for row in rows {
372 let (task_id_str, _op_type, op_data, timestamp, client_id) = row.map_err(|e| CoreError::Database(e.to_string()))?;
373 let task_id = TaskId::parse(&task_id_str).unwrap_or_else(|_| TaskId::new("TASK", 0));
374
375 let operation = serde_json::from_str(&op_data).unwrap_or_else(|_| {
376 Operation::SetTitle {
377 old: String::new(),
378 new: String::new(),
379 }
380 });
381
382 changeset.add_operation(TaskOp {
383 task_id,
384 operation,
385 timestamp: timestamp as u64,
386 client_id: client_id as u64,
387 });
388 }
389
390 Ok(changeset)
391 }
392
393 async fn apply_changeset(&self, board_id: &BoardId, changeset: &Changeset) -> Result<(), CoreError> {
394 let mut board = match self.load_board(board_id).await {
395 Ok(b) => b,
396 Err(CoreError::BoardNotFound(_)) => {
397 Board::new(board_id.as_ref(), "Imported Board")
398 }
399 Err(e) => return Err(e),
400 };
401
402 changeset.apply(&mut board)?;
403 self.save_board(&board).await?;
404
405 let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
406
407 for op in &changeset.operations {
408 let op_type = serde_json::to_string(&op.operation).unwrap_or_default();
409 conn.execute(
410 "INSERT INTO operations (board_id, task_id, operation_type, operation_data, timestamp, client_id, version)
411 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
412 rusqlite::params![
413 board_id.as_ref(),
414 op.task_id.to_string(),
415 "update",
416 op_type,
417 op.timestamp as i64,
418 op.client_id as i64,
419 op.timestamp as i64
420 ],
421 ).map_err(|e| CoreError::Database(e.to_string()))?;
422 }
423
424 Ok(())
425 }
426}