Skip to main content

boarddown_db/
backend.rs

1use std::path::Path;
2use std::sync::Mutex;
3use async_trait::async_trait;
4
5use boarddown_core::{Board, BoardId, Task, TaskId, Query, Changeset, Version, Storage, Error as CoreError, SearchResult};
6use boarddown_schema::{Status, Column, Operation, TaskOp, Metadata, TaskBuilder as SchemaTaskBuilder};
7use crate::fts::FullTextSearch;
8
9#[derive(Debug)]
10pub struct SqliteStorage {
11    path: std::path::PathBuf,
12    conn: Mutex<rusqlite::Connection>,
13    fts: FullTextSearch,
14}
15
16impl SqliteStorage {
17    pub async fn new(path: impl AsRef<Path>) -> Result<Self, CoreError> {
18        let path = path.as_ref().to_path_buf();
19        let conn = rusqlite::Connection::open(&path).map_err(|e| CoreError::Database(e.to_string()))?;
20        
21        let fts = FullTextSearch::new(true);
22        
23        Ok(Self {
24            path,
25            conn: Mutex::new(conn),
26            fts,
27        })
28    }
29
30    pub async fn init(&self) -> Result<(), CoreError> {
31        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
32        
33        conn.execute(
34            "CREATE TABLE IF NOT EXISTS boards (
35                id TEXT PRIMARY KEY,
36                title TEXT NOT NULL,
37                version INTEGER NOT NULL DEFAULT 0,
38                created_at TEXT NOT NULL,
39                updated_at TEXT NOT NULL
40            )",
41            [],
42        ).map_err(|e| CoreError::Database(e.to_string()))?;
43
44        conn.execute(
45            "CREATE TABLE IF NOT EXISTS columns (
46                id INTEGER PRIMARY KEY AUTOINCREMENT,
47                board_id TEXT NOT NULL,
48                name TEXT NOT NULL,
49                position INTEGER NOT NULL,
50                FOREIGN KEY (board_id) REFERENCES boards(id) ON DELETE CASCADE
51            )",
52            [],
53        ).map_err(|e| CoreError::Database(e.to_string()))?;
54
55        conn.execute(
56            "CREATE TABLE IF NOT EXISTS tasks (
57                id TEXT NOT NULL,
58                board_id TEXT NOT NULL,
59                title TEXT NOT NULL,
60                status TEXT NOT NULL,
61                column_name TEXT NOT NULL,
62                position INTEGER NOT NULL DEFAULT 0,
63                dependencies TEXT NOT NULL DEFAULT '[]',
64                metadata TEXT NOT NULL DEFAULT '{}',
65                created_at TEXT NOT NULL,
66                updated_at TEXT NOT NULL,
67                PRIMARY KEY (id, board_id),
68                FOREIGN KEY (board_id) REFERENCES boards(id) ON DELETE CASCADE
69            )",
70            [],
71        ).map_err(|e| CoreError::Database(e.to_string()))?;
72
73        conn.execute(
74            "CREATE TABLE IF NOT EXISTS operations (
75                id INTEGER PRIMARY KEY AUTOINCREMENT,
76                board_id TEXT NOT NULL,
77                task_id TEXT NOT NULL,
78                operation_type TEXT NOT NULL,
79                operation_data TEXT NOT NULL,
80                timestamp INTEGER NOT NULL,
81                client_id INTEGER NOT NULL,
82                version INTEGER NOT NULL,
83                FOREIGN KEY (board_id) REFERENCES boards(id) ON DELETE CASCADE
84            )",
85            [],
86        ).map_err(|e| CoreError::Database(e.to_string()))?;
87
88        conn.execute(
89            "CREATE INDEX IF NOT EXISTS idx_tasks_board_id ON tasks(board_id)",
90            [],
91        ).map_err(|e| CoreError::Database(e.to_string()))?;
92
93        conn.execute(
94            "CREATE INDEX IF NOT EXISTS idx_operations_board_version ON operations(board_id, version)",
95            [],
96        ).map_err(|e| CoreError::Database(e.to_string()))?;
97
98        self.fts.init(&conn)?;
99        
100        Ok(())
101    }
102
103    pub async fn run_migrations(&self) -> Result<(), CoreError> {
104        Ok(())
105    }
106
107    pub async fn enable_wal(&self) -> Result<(), CoreError> {
108        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
109        conn.execute_batch("PRAGMA journal_mode=WAL;").map_err(|e| CoreError::Database(e.to_string()))?;
110        Ok(())
111    }
112
113    fn board_from_rows(&self, board_id: &str, title: &str) -> Result<Board, CoreError> {
114        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
115        
116        let mut board = Board::new(board_id, title);
117        
118        let mut stmt = conn.prepare(
119            "SELECT name, position FROM columns WHERE board_id = ? ORDER BY position"
120        ).map_err(|e| CoreError::Database(e.to_string()))?;
121        
122        let columns: Vec<(String, i32)> = stmt
123            .query_map([board_id], |row| Ok((row.get(0)?, row.get(1)?)))
124            .map_err(|e| CoreError::Database(e.to_string()))?
125            .filter_map(|r| r.ok())
126            .collect();
127        
128        for (name, _pos) in columns {
129            board.add_column(Column::new(&name));
130        }
131        
132        Ok(board)
133    }
134
135    fn load_tasks_for_board(&self, board_id: &str) -> Result<std::collections::HashMap<TaskId, Task>, CoreError> {
136        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
137        
138        let mut stmt = conn.prepare(
139            "SELECT id, title, status, column_name, position, dependencies, metadata, created_at, updated_at 
140             FROM tasks WHERE board_id = ?"
141        ).map_err(|e| CoreError::Database(e.to_string()))?;
142        
143        let rows = stmt.query_map([board_id], |row| {
144            let id: String = row.get(0)?;
145            let title: String = row.get(1)?;
146            let status: String = row.get(2)?;
147            let column_name: String = row.get(3)?;
148            let _position: i32 = row.get(4)?;
149            let dependencies_json: String = row.get(5)?;
150            let metadata_json: String = row.get(6)?;
151            
152            Ok((id, title, status, column_name, dependencies_json, metadata_json))
153        }).map_err(|e| CoreError::Database(e.to_string()))?;
154        
155        let mut tasks = std::collections::HashMap::new();
156        
157        for row in rows {
158            let (id, title, status_str, column_name, deps_json, meta_json) = row.map_err(|e| CoreError::Database(e.to_string()))?;
159            
160            let status = match status_str.as_str() {
161                "todo" => Status::Todo,
162                "inprogress" => Status::InProgress,
163                "ready" => Status::Ready,
164                "done" => Status::Done,
165                "blocked" => Status::Blocked,
166                "urgent" => Status::Urgent,
167                _ => Status::Todo,
168            };
169            
170            let dependencies: Vec<TaskId> = serde_json::from_str(&deps_json).unwrap_or_default();
171            let metadata: Metadata = serde_json::from_str(&meta_json).unwrap_or_default();
172            
173            let task = SchemaTaskBuilder::default()
174                .id(TaskId::parse(&id).unwrap_or_else(|_| TaskId::new("TASK", 0)))
175                .title(&title)
176                .status(status)
177                .column(&column_name)
178                .metadata(metadata)
179                .with_dependencies(dependencies)
180                .build()
181                .unwrap();
182            
183            tasks.insert(task.id.clone(), task);
184        }
185        
186        Ok(tasks)
187    }
188}
189
190#[async_trait]
191impl Storage for SqliteStorage {
192    async fn load_board(&self, id: &BoardId) -> Result<Board, CoreError> {
193        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
194        
195        let mut stmt = conn.prepare("SELECT id, title FROM boards WHERE id = ?").map_err(|e| CoreError::Database(e.to_string()))?;
196        let board_info: Option<(String, String)> = stmt
197            .query_row([id.as_ref()], |row| Ok((row.get(0)?, row.get(1)?)))
198            .ok();
199        
200        if let Some((board_id, title)) = board_info {
201            drop(stmt);
202            drop(conn);
203            
204            let mut board = self.board_from_rows(&board_id, &title)?;
205            let tasks = self.load_tasks_for_board(&board_id)?;
206            board.tasks = tasks;
207            Ok(board)
208        } else {
209            Err(CoreError::BoardNotFound(id.to_string()))
210        }
211    }
212
213    async fn save_board(&self, board: &Board) -> Result<(), CoreError> {
214        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
215        
216        let now = chrono::Utc::now().to_rfc3339();
217        
218        conn.execute(
219            "INSERT OR REPLACE INTO boards (id, title, version, created_at, updated_at) 
220             VALUES (?1, ?2, 0, ?3, ?3)",
221            [board.id.as_ref(), &board.title, &now],
222        ).map_err(|e| CoreError::Database(e.to_string()))?;
223        
224        conn.execute(
225            "DELETE FROM columns WHERE board_id = ?",
226            [board.id.as_ref()],
227        ).map_err(|e| CoreError::Database(e.to_string()))?;
228        
229        for (i, col) in board.columns.iter().enumerate() {
230            conn.execute(
231                "INSERT INTO columns (board_id, name, position) VALUES (?, ?, ?)",
232                rusqlite::params![board.id.as_ref(), col.name, i as i32],
233            ).map_err(|e| CoreError::Database(e.to_string()))?;
234        }
235        
236        conn.execute(
237            "DELETE FROM tasks WHERE board_id = ?",
238            [board.id.as_ref()],
239        ).map_err(|e| CoreError::Database(e.to_string()))?;
240        
241        for (i, task) in board.tasks.values().enumerate() {
242            let status_str = match task.status {
243                Status::Todo => "todo",
244                Status::InProgress => "inprogress",
245                Status::Ready => "ready",
246                Status::Done => "done",
247                Status::Blocked => "blocked",
248                Status::Urgent => "urgent",
249            };
250            
251            let deps_json = serde_json::to_string(&task.dependencies).unwrap_or_else(|_| "[]".to_string());
252            let meta_json = serde_json::to_string(&task.metadata).unwrap_or_else(|_| "{}".to_string());
253            
254            conn.execute(
255                "INSERT INTO tasks (id, board_id, title, status, column_name, position, dependencies, metadata, created_at, updated_at)
256                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)",
257                rusqlite::params![
258                    task.id.to_string(),
259                    board.id.as_ref(),
260                    task.title,
261                    status_str,
262                    task.column.to_string(),
263                    i as i32,
264                    deps_json,
265                    meta_json,
266                    now
267                ],
268            ).map_err(|e| CoreError::Database(e.to_string()))?;
269        }
270        
271        Ok(())
272    }
273
274    async fn delete_board(&self, id: &BoardId) -> Result<(), CoreError> {
275        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
276        
277        conn.execute("DELETE FROM boards WHERE id = ?", [id.as_ref()]).map_err(|e| CoreError::Database(e.to_string()))?;
278        Ok(())
279    }
280
281    async fn list_boards(&self) -> Result<Vec<BoardId>, CoreError> {
282        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
283        
284        let mut stmt = conn.prepare("SELECT id FROM boards").map_err(|e| CoreError::Database(e.to_string()))?;
285        let rows = stmt.query_map([], |row| row.get::<_, String>(0)).map_err(|e| CoreError::Database(e.to_string()))?;
286        
287        let mut ids = Vec::new();
288        for row in rows {
289            if let Ok(id) = row {
290                ids.push(BoardId::from(id));
291            }
292        }
293        
294        Ok(ids)
295    }
296
297    async fn load_task(&self, board_id: &BoardId, task_id: &TaskId) -> Result<Task, CoreError> {
298        let board = self.load_board(board_id).await?;
299        board.tasks
300            .get(task_id)
301            .cloned()
302            .ok_or_else(|| CoreError::TaskNotFound(task_id.to_string()))
303    }
304
305    async fn save_task(&self, board_id: &BoardId, task: &Task) -> Result<(), CoreError> {
306        let mut board = self.load_board(board_id).await?;
307        board.tasks.insert(task.id.clone(), task.clone());
308        self.save_board(&board).await?;
309        
310        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
311        self.fts.index_task(
312            &conn,
313            board_id,
314            &task.id,
315            &task.title,
316            "",
317            &task.metadata.tags,
318        )?;
319        
320        Ok(())
321    }
322
323    async fn delete_task(&self, board_id: &BoardId, task_id: &TaskId) -> Result<(), CoreError> {
324        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
325        
326        conn.execute(
327            "DELETE FROM tasks WHERE board_id = ? AND id = ?",
328            rusqlite::params![board_id.as_ref(), task_id.to_string()],
329        ).map_err(|e| CoreError::Database(e.to_string()))?;
330        
331        self.fts.remove_task(&conn, board_id, task_id)?;
332        
333        Ok(())
334    }
335
336    async fn query(&self, board_id: &BoardId, query: Query) -> Result<Vec<Task>, CoreError> {
337        let board = self.load_board(board_id).await?;
338        Ok(boarddown_core::execute_query(&board, query))
339    }
340    
341    async fn search(&self, query: &str) -> Result<Vec<SearchResult>, CoreError> {
342        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
343        self.fts.search(&conn, query)
344    }
345
346    async fn search_in_board(&self, board_id: &BoardId, query: &str) -> Result<Vec<SearchResult>, CoreError> {
347        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
348        self.fts.search_in_board(&conn, board_id, query)
349    }
350
351    async fn get_changeset(&self, board_id: &BoardId, since: Version) -> Result<Changeset, CoreError> {
352        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
353        
354        let mut stmt = conn.prepare(
355            "SELECT task_id, operation_type, operation_data, timestamp, client_id 
356             FROM operations WHERE board_id = ? AND version > ? ORDER BY version"
357        ).map_err(|e| CoreError::Database(e.to_string()))?;
358        
359        let rows = stmt.query_map(rusqlite::params![board_id.as_ref(), since.0], |row| {
360            Ok((
361                row.get::<_, String>(0)?,
362                row.get::<_, String>(1)?,
363                row.get::<_, String>(2)?,
364                row.get::<_, i64>(3)?,
365                row.get::<_, i64>(4)?,
366            ))
367        }).map_err(|e| CoreError::Database(e.to_string()))?;
368        
369        let mut changeset = Changeset::new(board_id.clone(), since);
370        
371        for row in rows {
372            let (task_id_str, _op_type, op_data, timestamp, client_id) = row.map_err(|e| CoreError::Database(e.to_string()))?;
373            let task_id = TaskId::parse(&task_id_str).unwrap_or_else(|_| TaskId::new("TASK", 0));
374            
375            let operation = serde_json::from_str(&op_data).unwrap_or_else(|_| {
376                Operation::SetTitle {
377                    old: String::new(),
378                    new: String::new(),
379                }
380            });
381            
382            changeset.add_operation(TaskOp {
383                task_id,
384                operation,
385                timestamp: timestamp as u64,
386                client_id: client_id as u64,
387            });
388        }
389        
390        Ok(changeset)
391    }
392
393    async fn apply_changeset(&self, board_id: &BoardId, changeset: &Changeset) -> Result<(), CoreError> {
394        let mut board = match self.load_board(board_id).await {
395            Ok(b) => b,
396            Err(CoreError::BoardNotFound(_)) => {
397                Board::new(board_id.as_ref(), "Imported Board")
398            }
399            Err(e) => return Err(e),
400        };
401        
402        changeset.apply(&mut board)?;
403        self.save_board(&board).await?;
404        
405        let conn = self.conn.lock().map_err(|e| CoreError::Storage(e.to_string()))?;
406        
407        for op in &changeset.operations {
408            let op_type = serde_json::to_string(&op.operation).unwrap_or_default();
409            conn.execute(
410                "INSERT INTO operations (board_id, task_id, operation_type, operation_data, timestamp, client_id, version)
411                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
412                rusqlite::params![
413                    board_id.as_ref(),
414                    op.task_id.to_string(),
415                    "update",
416                    op_type,
417                    op.timestamp as i64,
418                    op.client_id as i64,
419                    op.timestamp as i64
420                ],
421            ).map_err(|e| CoreError::Database(e.to_string()))?;
422        }
423        
424        Ok(())
425    }
426}