Skip to main content

meshlet_core/
store.rs

1use std::path::Path;
2
3use rusqlite::{Connection, OpenFlags, params};
4
5use crate::error::Result;
6
7const SCHEMA_VERSION: i32 = 1;
8
9static CREATE_LORO_DOC: &str = "
10CREATE TABLE IF NOT EXISTS loro_doc (
11    id       INTEGER PRIMARY KEY,
12    snapshot BLOB NOT NULL,
13    vv       BLOB NOT NULL
14);
15";
16
17static CREATE_BOOKMARKS: &str = "
18CREATE TABLE IF NOT EXISTS bookmarks (
19    id              TEXT PRIMARY KEY,
20    url             TEXT NOT NULL,
21    title           TEXT NOT NULL DEFAULT '',
22    desc            TEXT NOT NULL DEFAULT '',
23    immutable_title INTEGER NOT NULL DEFAULT 0,
24    created_at      INTEGER,
25    updated_at      INTEGER
26);
27";
28
29static CREATE_BOOKMARK_TAGS: &str = "
30CREATE TABLE IF NOT EXISTS bookmark_tags (
31    bookmark_id TEXT NOT NULL REFERENCES bookmarks(id) ON DELETE CASCADE,
32    tag         TEXT NOT NULL,
33    PRIMARY KEY (bookmark_id, tag)
34);
35";
36
37static CREATE_INDEX_TAG: &str =
38    "CREATE INDEX IF NOT EXISTS idx_bookmark_tags_tag ON bookmark_tags(tag);";
39static CREATE_INDEX_URL: &str =
40    "CREATE INDEX IF NOT EXISTS idx_bookmarks_url ON bookmarks(url);";
41
42static CREATE_META: &str = "
43CREATE TABLE IF NOT EXISTS meta (
44    key   TEXT PRIMARY KEY,
45    value BLOB NOT NULL
46);
47";
48
49pub struct Store {
50    conn: Connection,
51}
52
53impl Store {
54    pub fn open(path: &Path) -> Result<Self> {
55        let conn = Connection::open_with_flags(
56            path,
57            OpenFlags::SQLITE_OPEN_READ_WRITE
58                | OpenFlags::SQLITE_OPEN_CREATE
59                | OpenFlags::SQLITE_OPEN_NO_MUTEX,
60        )?;
61
62        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
63
64        let store = Self { conn };
65        store.migrate()?;
66        store.register_functions()?;
67        Ok(store)
68    }
69
70    pub fn open_in_memory() -> Result<Self> {
71        let conn = Connection::open_in_memory()?;
72
73        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
74
75        let store = Self { conn };
76        store.migrate()?;
77        store.register_functions()?;
78        Ok(store)
79    }
80
81    pub fn connection(&self) -> &Connection {
82        &self.conn
83    }
84
85    pub fn save_snapshot(&self, snapshot: &[u8], vv: &[u8]) -> Result<()> {
86        self.conn.execute(
87            "INSERT OR REPLACE INTO loro_doc (id, snapshot, vv) VALUES (1, ?1, ?2)",
88            params![snapshot, vv],
89        )?;
90        Ok(())
91    }
92
93    pub fn load_snapshot(&self) -> Result<Option<Vec<u8>>> {
94        let mut stmt = self
95            .conn
96            .prepare("SELECT snapshot FROM loro_doc WHERE id = 1")?;
97
98        let result = stmt.query_row([], |row| row.get(0)).optional()?;
99        Ok(result)
100    }
101
102    pub fn load_vv(&self) -> Result<Option<Vec<u8>>> {
103        let mut stmt = self
104            .conn
105            .prepare("SELECT vv FROM loro_doc WHERE id = 1")?;
106
107        let result = stmt.query_row([], |row| row.get(0)).optional()?;
108        Ok(result)
109    }
110
111    pub fn set_meta(&self, key: &str, value: &[u8]) -> Result<()> {
112        self.conn.execute(
113            "INSERT OR REPLACE INTO meta (key, value) VALUES (?1, ?2)",
114            params![key, value],
115        )?;
116        Ok(())
117    }
118
119    pub fn get_meta(&self, key: &str) -> Result<Option<Vec<u8>>> {
120        let mut stmt = self
121            .conn
122            .prepare("SELECT value FROM meta WHERE key = ?1")?;
123
124        let result = stmt
125            .query_row(params![key], |row| row.get(0))
126            .optional()?;
127        Ok(result)
128    }
129
130    pub fn set_meta_string(&self, key: &str, value: &str) -> Result<()> {
131        self.set_meta(key, value.as_bytes())
132    }
133
134    pub fn get_meta_string(&self, key: &str) -> Result<Option<String>> {
135        match self.get_meta(key)? {
136            Some(data) => Ok(String::from_utf8(data).ok()),
137            None => Ok(None),
138        }
139    }
140
141    pub fn upsert_bookmark(&self, b: &crate::model::Bookmark) -> Result<()> {
142        self.conn.execute(
143            "INSERT INTO bookmarks (id, url, title, desc, immutable_title, created_at, updated_at)
144             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
145             ON CONFLICT(id) DO UPDATE SET
146               url = excluded.url,
147               title = excluded.title,
148               desc = excluded.desc,
149               immutable_title = excluded.immutable_title,
150               updated_at = excluded.updated_at",
151            params![
152                b.id.as_str(),
153                b.url,
154                b.title,
155                b.desc,
156                b.flags & 0x01,
157                b.created_at,
158                b.updated_at
159            ],
160        )?;
161
162        self.replace_tags(b.id.as_str(), &b.tags)?;
163        Ok(())
164    }
165
166    pub fn delete_bookmark_mirror(&self, id: &str) -> Result<()> {
167        self.conn
168            .execute("DELETE FROM bookmarks WHERE id = ?1", params![id])?;
169        Ok(())
170    }
171
172    pub fn replace_tags(
173        &self,
174        bookmark_id: &str,
175        tags: &std::collections::BTreeSet<String>,
176    ) -> Result<()> {
177        let tx = self.conn.unchecked_transaction()?;
178
179        tx.execute(
180            "DELETE FROM bookmark_tags WHERE bookmark_id = ?1",
181            params![bookmark_id],
182        )?;
183
184        let mut stmt =
185            tx.prepare("INSERT INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)")?;
186
187        for tag in tags {
188            stmt.execute(params![bookmark_id, tag])?;
189        }
190
191        drop(stmt);
192        tx.commit()?;
193        Ok(())
194    }
195
196    fn migrate(&self) -> Result<()> {
197        let current_version: i32 = self
198            .conn
199            .query_row("PRAGMA user_version", [], |row| row.get(0))?;
200
201        if current_version < 1 {
202            let tx = self.conn.unchecked_transaction()?;
203
204            tx.execute_batch(CREATE_META)?;
205            tx.execute_batch(CREATE_LORO_DOC)?;
206            tx.execute_batch(CREATE_BOOKMARKS)?;
207            tx.execute_batch(CREATE_BOOKMARK_TAGS)?;
208            tx.execute_batch(CREATE_INDEX_TAG)?;
209            tx.execute_batch(CREATE_INDEX_URL)?;
210            tx.pragma_update(None, "user_version", SCHEMA_VERSION)?;
211
212            tx.commit()?;
213        }
214
215        Ok(())
216    }
217
218    fn register_functions(&self) -> Result<()> {
219        use rusqlite::functions::FunctionFlags;
220
221        self.conn.create_scalar_function(
222            "netloc",
223            1,
224            FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC,
225            |ctx| {
226                let s = ctx.get_raw(0).as_str().unwrap_or("");
227                if let Ok(parsed) = url::Url::parse(s) {
228                    match parsed.host_str() {
229                        Some(host) => Ok(host.to_string()),
230                        None => Ok(String::new()),
231                    }
232                } else {
233                    Ok(String::new())
234                }
235            },
236        )?;
237
238        self.conn.create_scalar_function(
239            "regexp",
240            2,
241            FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC,
242            |ctx| {
243                let pattern = ctx.get_raw(0).as_str().unwrap_or("");
244                let text = ctx.get_raw(1).as_str().unwrap_or("");
245                match regex::Regex::new(pattern) {
246                    Ok(re) => Ok(re.is_match(text) as i32),
247                    Err(_) => Ok(0),
248                }
249            },
250        )?;
251
252        Ok(())
253    }
254}
255
256trait OptionalExt<T> {
257    fn optional(self) -> std::result::Result<Option<T>, rusqlite::Error>;
258}
259
260impl<T> OptionalExt<T> for std::result::Result<T, rusqlite::Error> {
261    fn optional(self) -> std::result::Result<Option<T>, rusqlite::Error> {
262        match self {
263            Ok(v) => Ok(Some(v)),
264            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
265            Err(e) => Err(e),
266        }
267    }
268}