ldk_node/io/sqlite_store/
mod.rs1use crate::io::utils::check_namespace_key_validity;
10
11use lightning::io;
12use lightning::util::persist::KVStore;
13use lightning::util::string::PrintableString;
14
15use rusqlite::{named_params, Connection};
16
17use std::fs;
18use std::path::PathBuf;
19use std::sync::{Arc, Mutex};
20
21mod migrations;
22
23pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite";
25pub const KV_TABLE_NAME: &str = "ldk_node_data";
27
28pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
30
31pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
33
34const SCHEMA_USER_VERSION: u16 = 2;
36
37pub struct SqliteStore {
41 connection: Arc<Mutex<Connection>>,
42 data_dir: PathBuf,
43 kv_table_name: String,
44}
45
46impl SqliteStore {
47 pub fn new(
54 data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
55 ) -> io::Result<Self> {
56 let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
57 let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
58
59 fs::create_dir_all(data_dir.clone()).map_err(|e| {
60 let msg = format!(
61 "Failed to create database destination directory {}: {}",
62 data_dir.display(),
63 e
64 );
65 io::Error::new(io::ErrorKind::Other, msg)
66 })?;
67 let mut db_file_path = data_dir.clone();
68 db_file_path.push(db_file_name);
69
70 let mut connection = Connection::open(db_file_path.clone()).map_err(|e| {
71 let msg =
72 format!("Failed to open/create database file {}: {}", db_file_path.display(), e);
73 io::Error::new(io::ErrorKind::Other, msg)
74 })?;
75
76 let sql = format!("SELECT user_version FROM pragma_user_version");
77 let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap();
78
79 if version_res == 0 {
80 connection
82 .pragma(
83 Some(rusqlite::DatabaseName::Main),
84 "user_version",
85 SCHEMA_USER_VERSION,
86 |_| Ok(()),
87 )
88 .map_err(|e| {
89 let msg = format!("Failed to set PRAGMA user_version: {}", e);
90 io::Error::new(io::ErrorKind::Other, msg)
91 })?;
92 } else if version_res < SCHEMA_USER_VERSION {
93 migrations::migrate_schema(
94 &mut connection,
95 &kv_table_name,
96 version_res,
97 SCHEMA_USER_VERSION,
98 )?;
99 } else if version_res > SCHEMA_USER_VERSION {
100 let msg = format!(
101 "Failed to open database: incompatible schema version {}. Expected: {}",
102 version_res, SCHEMA_USER_VERSION
103 );
104 return Err(io::Error::new(io::ErrorKind::Other, msg));
105 }
106
107 let sql = format!(
108 "CREATE TABLE IF NOT EXISTS {} (
109 primary_namespace TEXT NOT NULL,
110 secondary_namespace TEXT DEFAULT \"\" NOT NULL,
111 key TEXT NOT NULL CHECK (key <> ''),
112 value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
113 );",
114 kv_table_name
115 );
116
117 connection.execute(&sql, []).map_err(|e| {
118 let msg = format!("Failed to create table {}: {}", kv_table_name, e);
119 io::Error::new(io::ErrorKind::Other, msg)
120 })?;
121
122 let connection = Arc::new(Mutex::new(connection));
123 Ok(Self { connection, data_dir, kv_table_name })
124 }
125
126 pub fn get_data_dir(&self) -> PathBuf {
128 self.data_dir.clone()
129 }
130}
131
132impl KVStore for SqliteStore {
133 fn read(
134 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
135 ) -> io::Result<Vec<u8>> {
136 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
137
138 let locked_conn = self.connection.lock().unwrap();
139 let sql =
140 format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;",
141 self.kv_table_name);
142
143 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
144 let msg = format!("Failed to prepare statement: {}", e);
145 io::Error::new(io::ErrorKind::Other, msg)
146 })?;
147
148 let res = stmt
149 .query_row(
150 named_params! {
151 ":primary_namespace": primary_namespace,
152 ":secondary_namespace": secondary_namespace,
153 ":key": key,
154 },
155 |row| row.get(0),
156 )
157 .map_err(|e| match e {
158 rusqlite::Error::QueryReturnedNoRows => {
159 let msg = format!(
160 "Failed to read as key could not be found: {}/{}/{}",
161 PrintableString(primary_namespace),
162 PrintableString(secondary_namespace),
163 PrintableString(key)
164 );
165 io::Error::new(io::ErrorKind::NotFound, msg)
166 },
167 e => {
168 let msg = format!(
169 "Failed to read from key {}/{}/{}: {}",
170 PrintableString(primary_namespace),
171 PrintableString(secondary_namespace),
172 PrintableString(key),
173 e
174 );
175 io::Error::new(io::ErrorKind::Other, msg)
176 },
177 })?;
178 Ok(res)
179 }
180
181 fn write(
182 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
183 ) -> io::Result<()> {
184 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
185
186 let locked_conn = self.connection.lock().unwrap();
187
188 let sql = format!(
189 "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
190 self.kv_table_name
191 );
192
193 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
194 let msg = format!("Failed to prepare statement: {}", e);
195 io::Error::new(io::ErrorKind::Other, msg)
196 })?;
197
198 stmt.execute(named_params! {
199 ":primary_namespace": primary_namespace,
200 ":secondary_namespace": secondary_namespace,
201 ":key": key,
202 ":value": buf,
203 })
204 .map(|_| ())
205 .map_err(|e| {
206 let msg = format!(
207 "Failed to write to key {}/{}/{}: {}",
208 PrintableString(primary_namespace),
209 PrintableString(secondary_namespace),
210 PrintableString(key),
211 e
212 );
213 io::Error::new(io::ErrorKind::Other, msg)
214 })
215 }
216
217 fn remove(
218 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
219 ) -> io::Result<()> {
220 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
221
222 let locked_conn = self.connection.lock().unwrap();
223
224 let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
225
226 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
227 let msg = format!("Failed to prepare statement: {}", e);
228 io::Error::new(io::ErrorKind::Other, msg)
229 })?;
230
231 stmt.execute(named_params! {
232 ":primary_namespace": primary_namespace,
233 ":secondary_namespace": secondary_namespace,
234 ":key": key,
235 })
236 .map_err(|e| {
237 let msg = format!(
238 "Failed to delete key {}/{}/{}: {}",
239 PrintableString(primary_namespace),
240 PrintableString(secondary_namespace),
241 PrintableString(key),
242 e
243 );
244 io::Error::new(io::ErrorKind::Other, msg)
245 })?;
246 Ok(())
247 }
248
249 fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
250 check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
251
252 let locked_conn = self.connection.lock().unwrap();
253
254 let sql = format!(
255 "SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace",
256 self.kv_table_name
257 );
258 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
259 let msg = format!("Failed to prepare statement: {}", e);
260 io::Error::new(io::ErrorKind::Other, msg)
261 })?;
262
263 let mut keys = Vec::new();
264
265 let rows_iter = stmt
266 .query_map(
267 named_params! {
268 ":primary_namespace": primary_namespace,
269 ":secondary_namespace": secondary_namespace,
270 },
271 |row| row.get(0),
272 )
273 .map_err(|e| {
274 let msg = format!("Failed to retrieve queried rows: {}", e);
275 io::Error::new(io::ErrorKind::Other, msg)
276 })?;
277
278 for k in rows_iter {
279 keys.push(k.map_err(|e| {
280 let msg = format!("Failed to retrieve queried rows: {}", e);
281 io::Error::new(io::ErrorKind::Other, msg)
282 })?);
283 }
284
285 Ok(keys)
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::io::test_utils::{
293 do_read_write_remove_list_persist, do_test_store, random_storage_path,
294 };
295
296 impl Drop for SqliteStore {
297 fn drop(&mut self) {
298 match fs::remove_dir_all(&self.data_dir) {
299 Err(e) => println!("Failed to remove test store directory: {}", e),
300 _ => {},
301 }
302 }
303 }
304
305 #[test]
306 fn read_write_remove_list_persist() {
307 let mut temp_path = random_storage_path();
308 temp_path.push("read_write_remove_list_persist");
309 let store = SqliteStore::new(
310 temp_path,
311 Some("test_db".to_string()),
312 Some("test_table".to_string()),
313 )
314 .unwrap();
315 do_read_write_remove_list_persist(&store);
316 }
317
318 #[test]
319 fn test_sqlite_store() {
320 let mut temp_path = random_storage_path();
321 temp_path.push("test_sqlite_store");
322 let store_0 = SqliteStore::new(
323 temp_path.clone(),
324 Some("test_db_0".to_string()),
325 Some("test_table".to_string()),
326 )
327 .unwrap();
328 let store_1 = SqliteStore::new(
329 temp_path,
330 Some("test_db_1".to_string()),
331 Some("test_table".to_string()),
332 )
333 .unwrap();
334 do_test_store(&store_0, &store_1)
335 }
336}
337
338#[cfg(ldk_bench)]
339pub mod bench {
341 use criterion::Criterion;
342
343 pub fn bench_sends(bench: &mut Criterion) {
345 let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap();
346 let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap();
347 lightning::ln::channelmanager::bench::bench_two_sends(
348 bench,
349 "bench_sqlite_persisted_sends",
350 store_a,
351 store_b,
352 );
353 }
354}