1use std::boxed::Box;
10use std::collections::HashMap;
11use std::fs;
12use std::future::Future;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17
18use lightning::io;
19use lightning::util::persist::{KVStore, KVStoreSync};
20use lightning_types::string::PrintableString;
21use rusqlite::{named_params, Connection};
22
23use crate::io::utils::check_namespace_key_validity;
24
25mod migrations;
26
27pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite";
29pub const KV_TABLE_NAME: &str = "ldk_node_data";
31
32pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
34
35pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
37
38const SCHEMA_USER_VERSION: u16 = 2;
40
41pub struct SqliteStore {
45 inner: Arc<SqliteStoreInner>,
46
47 next_write_version: AtomicU64,
50}
51
52impl SqliteStore {
53 pub fn new(
60 data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
61 ) -> io::Result<Self> {
62 let inner = Arc::new(SqliteStoreInner::new(data_dir, db_file_name, kv_table_name)?);
63 let next_write_version = AtomicU64::new(1);
64 Ok(Self { inner, next_write_version })
65 }
66
67 fn build_locking_key(
68 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
69 ) -> String {
70 format!("{}#{}#{}", primary_namespace, secondary_namespace, key)
71 }
72
73 fn get_new_version_and_lock_ref(&self, locking_key: String) -> (Arc<Mutex<u64>>, u64) {
74 let version = self.next_write_version.fetch_add(1, Ordering::Relaxed);
75 if version == u64::MAX {
76 panic!("SqliteStore version counter overflowed");
77 }
78
79 let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key);
82
83 (inner_lock_ref, version)
84 }
85
86 pub fn get_data_dir(&self) -> PathBuf {
88 self.inner.data_dir.clone()
89 }
90}
91
92impl KVStore for SqliteStore {
93 fn read(
94 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
95 ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + Send>> {
96 let primary_namespace = primary_namespace.to_string();
97 let secondary_namespace = secondary_namespace.to_string();
98 let key = key.to_string();
99 let inner = Arc::clone(&self.inner);
100 let fut = tokio::task::spawn_blocking(move || {
101 inner.read_internal(&primary_namespace, &secondary_namespace, &key)
102 });
103 Box::pin(async move {
104 fut.await.unwrap_or_else(|e| {
105 let msg = format!("Failed to IO operation due join error: {}", e);
106 Err(io::Error::new(io::ErrorKind::Other, msg))
107 })
108 })
109 }
110
111 fn write(
112 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
113 ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
114 let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
115 let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
116 let primary_namespace = primary_namespace.to_string();
117 let secondary_namespace = secondary_namespace.to_string();
118 let key = key.to_string();
119 let inner = Arc::clone(&self.inner);
120 let fut = tokio::task::spawn_blocking(move || {
121 inner.write_internal(
122 inner_lock_ref,
123 locking_key,
124 version,
125 &primary_namespace,
126 &secondary_namespace,
127 &key,
128 buf,
129 )
130 });
131 Box::pin(async move {
132 fut.await.unwrap_or_else(|e| {
133 let msg = format!("Failed to IO operation due join error: {}", e);
134 Err(io::Error::new(io::ErrorKind::Other, msg))
135 })
136 })
137 }
138
139 fn remove(
140 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
141 ) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
142 let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
143 let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
144 let primary_namespace = primary_namespace.to_string();
145 let secondary_namespace = secondary_namespace.to_string();
146 let key = key.to_string();
147 let inner = Arc::clone(&self.inner);
148 let fut = tokio::task::spawn_blocking(move || {
149 inner.remove_internal(
150 inner_lock_ref,
151 locking_key,
152 version,
153 &primary_namespace,
154 &secondary_namespace,
155 &key,
156 )
157 });
158 Box::pin(async move {
159 fut.await.unwrap_or_else(|e| {
160 let msg = format!("Failed to IO operation due join error: {}", e);
161 Err(io::Error::new(io::ErrorKind::Other, msg))
162 })
163 })
164 }
165
166 fn list(
167 &self, primary_namespace: &str, secondary_namespace: &str,
168 ) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + Send>> {
169 let primary_namespace = primary_namespace.to_string();
170 let secondary_namespace = secondary_namespace.to_string();
171 let inner = Arc::clone(&self.inner);
172 let fut = tokio::task::spawn_blocking(move || {
173 inner.list_internal(&primary_namespace, &secondary_namespace)
174 });
175 Box::pin(async move {
176 fut.await.unwrap_or_else(|e| {
177 let msg = format!("Failed to IO operation due join error: {}", e);
178 Err(io::Error::new(io::ErrorKind::Other, msg))
179 })
180 })
181 }
182}
183
184impl KVStoreSync for SqliteStore {
185 fn read(
186 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
187 ) -> io::Result<Vec<u8>> {
188 self.inner.read_internal(primary_namespace, secondary_namespace, key)
189 }
190
191 fn write(
192 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
193 ) -> io::Result<()> {
194 let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
195 let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
196 self.inner.write_internal(
197 inner_lock_ref,
198 locking_key,
199 version,
200 primary_namespace,
201 secondary_namespace,
202 key,
203 buf,
204 )
205 }
206
207 fn remove(
208 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
209 ) -> io::Result<()> {
210 let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
211 let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
212 self.inner.remove_internal(
213 inner_lock_ref,
214 locking_key,
215 version,
216 primary_namespace,
217 secondary_namespace,
218 key,
219 )
220 }
221
222 fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
223 self.inner.list_internal(primary_namespace, secondary_namespace)
224 }
225}
226
227struct SqliteStoreInner {
228 connection: Arc<Mutex<Connection>>,
229 data_dir: PathBuf,
230 kv_table_name: String,
231 write_version_locks: Mutex<HashMap<String, Arc<Mutex<u64>>>>,
232}
233
234impl SqliteStoreInner {
235 fn new(
236 data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
237 ) -> io::Result<Self> {
238 let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
239 let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
240
241 fs::create_dir_all(data_dir.clone()).map_err(|e| {
242 let msg = format!(
243 "Failed to create database destination directory {}: {}",
244 data_dir.display(),
245 e
246 );
247 io::Error::new(io::ErrorKind::Other, msg)
248 })?;
249 let mut db_file_path = data_dir.clone();
250 db_file_path.push(db_file_name);
251
252 let mut connection = Connection::open(db_file_path.clone()).map_err(|e| {
253 let msg =
254 format!("Failed to open/create database file {}: {}", db_file_path.display(), e);
255 io::Error::new(io::ErrorKind::Other, msg)
256 })?;
257
258 let sql = format!("SELECT user_version FROM pragma_user_version");
259 let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap();
260
261 if version_res == 0 {
262 connection
264 .pragma(
265 Some(rusqlite::DatabaseName::Main),
266 "user_version",
267 SCHEMA_USER_VERSION,
268 |_| Ok(()),
269 )
270 .map_err(|e| {
271 let msg = format!("Failed to set PRAGMA user_version: {}", e);
272 io::Error::new(io::ErrorKind::Other, msg)
273 })?;
274 } else if version_res < SCHEMA_USER_VERSION {
275 migrations::migrate_schema(
276 &mut connection,
277 &kv_table_name,
278 version_res,
279 SCHEMA_USER_VERSION,
280 )?;
281 } else if version_res > SCHEMA_USER_VERSION {
282 let msg = format!(
283 "Failed to open database: incompatible schema version {}. Expected: {}",
284 version_res, SCHEMA_USER_VERSION
285 );
286 return Err(io::Error::new(io::ErrorKind::Other, msg));
287 }
288
289 let sql = format!(
290 "CREATE TABLE IF NOT EXISTS {} (
291 primary_namespace TEXT NOT NULL,
292 secondary_namespace TEXT DEFAULT \"\" NOT NULL,
293 key TEXT NOT NULL CHECK (key <> ''),
294 value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
295 );",
296 kv_table_name
297 );
298
299 connection.execute(&sql, []).map_err(|e| {
300 let msg = format!("Failed to create table {}: {}", kv_table_name, e);
301 io::Error::new(io::ErrorKind::Other, msg)
302 })?;
303
304 let connection = Arc::new(Mutex::new(connection));
305 let write_version_locks = Mutex::new(HashMap::new());
306 Ok(Self { connection, data_dir, kv_table_name, write_version_locks })
307 }
308
309 fn get_inner_lock_ref(&self, locking_key: String) -> Arc<Mutex<u64>> {
310 let mut outer_lock = self.write_version_locks.lock().unwrap();
311 Arc::clone(&outer_lock.entry(locking_key).or_default())
312 }
313
314 fn read_internal(
315 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
316 ) -> io::Result<Vec<u8>> {
317 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
318
319 let locked_conn = self.connection.lock().unwrap();
320 let sql =
321 format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;",
322 self.kv_table_name);
323
324 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
325 let msg = format!("Failed to prepare statement: {}", e);
326 io::Error::new(io::ErrorKind::Other, msg)
327 })?;
328
329 let res = stmt
330 .query_row(
331 named_params! {
332 ":primary_namespace": primary_namespace,
333 ":secondary_namespace": secondary_namespace,
334 ":key": key,
335 },
336 |row| row.get(0),
337 )
338 .map_err(|e| match e {
339 rusqlite::Error::QueryReturnedNoRows => {
340 let msg = format!(
341 "Failed to read as key could not be found: {}/{}/{}",
342 PrintableString(primary_namespace),
343 PrintableString(secondary_namespace),
344 PrintableString(key)
345 );
346 io::Error::new(io::ErrorKind::NotFound, msg)
347 },
348 e => {
349 let msg = format!(
350 "Failed to read from key {}/{}/{}: {}",
351 PrintableString(primary_namespace),
352 PrintableString(secondary_namespace),
353 PrintableString(key),
354 e
355 );
356 io::Error::new(io::ErrorKind::Other, msg)
357 },
358 })?;
359 Ok(res)
360 }
361
362 fn write_internal(
363 &self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
364 primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
365 ) -> io::Result<()> {
366 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
367
368 self.execute_locked_write(inner_lock_ref, locking_key, version, || {
369 let locked_conn = self.connection.lock().unwrap();
370
371 let sql = format!(
372 "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
373 self.kv_table_name
374 );
375
376 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
377 let msg = format!("Failed to prepare statement: {}", e);
378 io::Error::new(io::ErrorKind::Other, msg)
379 })?;
380
381 stmt.execute(named_params! {
382 ":primary_namespace": primary_namespace,
383 ":secondary_namespace": secondary_namespace,
384 ":key": key,
385 ":value": buf,
386 })
387 .map(|_| ())
388 .map_err(|e| {
389 let msg = format!(
390 "Failed to write to key {}/{}/{}: {}",
391 PrintableString(primary_namespace),
392 PrintableString(secondary_namespace),
393 PrintableString(key),
394 e
395 );
396 io::Error::new(io::ErrorKind::Other, msg)
397 })
398 })
399 }
400
401 fn remove_internal(
402 &self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
403 primary_namespace: &str, secondary_namespace: &str, key: &str,
404 ) -> io::Result<()> {
405 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
406
407 self.execute_locked_write(inner_lock_ref, locking_key, version, || {
408 let locked_conn = self.connection.lock().unwrap();
409
410 let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
411
412 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
413 let msg = format!("Failed to prepare statement: {}", e);
414 io::Error::new(io::ErrorKind::Other, msg)
415 })?;
416
417 stmt.execute(named_params! {
418 ":primary_namespace": primary_namespace,
419 ":secondary_namespace": secondary_namespace,
420 ":key": key,
421 })
422 .map_err(|e| {
423 let msg = format!(
424 "Failed to delete key {}/{}/{}: {}",
425 PrintableString(primary_namespace),
426 PrintableString(secondary_namespace),
427 PrintableString(key),
428 e
429 );
430 io::Error::new(io::ErrorKind::Other, msg)
431 })?;
432 Ok(())
433 })
434 }
435
436 fn list_internal(
437 &self, primary_namespace: &str, secondary_namespace: &str,
438 ) -> io::Result<Vec<String>> {
439 check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
440
441 let locked_conn = self.connection.lock().unwrap();
442
443 let sql = format!(
444 "SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace",
445 self.kv_table_name
446 );
447 let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
448 let msg = format!("Failed to prepare statement: {}", e);
449 io::Error::new(io::ErrorKind::Other, msg)
450 })?;
451
452 let mut keys = Vec::new();
453
454 let rows_iter = stmt
455 .query_map(
456 named_params! {
457 ":primary_namespace": primary_namespace,
458 ":secondary_namespace": secondary_namespace,
459 },
460 |row| row.get(0),
461 )
462 .map_err(|e| {
463 let msg = format!("Failed to retrieve queried rows: {}", e);
464 io::Error::new(io::ErrorKind::Other, msg)
465 })?;
466
467 for k in rows_iter {
468 keys.push(k.map_err(|e| {
469 let msg = format!("Failed to retrieve queried rows: {}", e);
470 io::Error::new(io::ErrorKind::Other, msg)
471 })?);
472 }
473
474 Ok(keys)
475 }
476
477 fn execute_locked_write<F: FnOnce() -> Result<(), lightning::io::Error>>(
478 &self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64, callback: F,
479 ) -> Result<(), lightning::io::Error> {
480 let res = {
481 let mut last_written_version = inner_lock_ref.lock().unwrap();
482
483 let is_stale_version = version <= *last_written_version;
486
487 if is_stale_version {
489 Ok(())
490 } else {
491 callback().map(|_| {
492 *last_written_version = version;
493 })
494 }
495 };
496
497 self.clean_locks(&inner_lock_ref, locking_key);
498
499 res
500 }
501
502 fn clean_locks(&self, inner_lock_ref: &Arc<Mutex<u64>>, locking_key: String) {
503 let mut outer_lock = self.write_version_locks.lock().unwrap();
508
509 let strong_count = Arc::strong_count(&inner_lock_ref);
510 debug_assert!(strong_count >= 2, "Unexpected SqliteStore strong count");
511
512 if strong_count == 2 {
513 outer_lock.remove(&locking_key);
514 }
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::io::test_utils::{
522 do_read_write_remove_list_persist, do_test_store, random_storage_path,
523 };
524
525 impl Drop for SqliteStore {
526 fn drop(&mut self) {
527 match fs::remove_dir_all(&self.inner.data_dir) {
528 Err(e) => println!("Failed to remove test store directory: {}", e),
529 _ => {},
530 }
531 }
532 }
533
534 #[test]
535 fn read_write_remove_list_persist() {
536 let mut temp_path = random_storage_path();
537 temp_path.push("read_write_remove_list_persist");
538 let store = SqliteStore::new(
539 temp_path,
540 Some("test_db".to_string()),
541 Some("test_table".to_string()),
542 )
543 .unwrap();
544 do_read_write_remove_list_persist(&store);
545 }
546
547 #[test]
548 fn test_sqlite_store() {
549 let mut temp_path = random_storage_path();
550 temp_path.push("test_sqlite_store");
551 let store_0 = SqliteStore::new(
552 temp_path.clone(),
553 Some("test_db_0".to_string()),
554 Some("test_table".to_string()),
555 )
556 .unwrap();
557 let store_1 = SqliteStore::new(
558 temp_path,
559 Some("test_db_1".to_string()),
560 Some("test_table".to_string()),
561 )
562 .unwrap();
563 do_test_store(&store_0, &store_1)
564 }
565}
566
567#[cfg(ldk_bench)]
568pub mod bench {
570 use criterion::Criterion;
571
572 pub fn bench_sends(bench: &mut Criterion) {
574 let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap();
575 let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap();
576 lightning::ln::channelmanager::bench::bench_two_sends(
577 bench,
578 "bench_sqlite_persisted_sends",
579 store_a,
580 store_b,
581 );
582 }
583}