ldk_node/io/sqlite_store/
mod.rs

1// This file is Copyright its original authors, visible in version control history.
2//
3// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6// accordance with one or both of these licenses.
7
8//! Objects related to [`SqliteStore`] live here.
9use std::boxed::Box;
10use std::collections::HashMap;
11use std::fs;
12use std::future::Future;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17
18use lightning::io;
19use lightning::util::persist::{KVStore, KVStoreSync};
20use lightning_types::string::PrintableString;
21use rusqlite::{named_params, Connection};
22
23use crate::io::utils::check_namespace_key_validity;
24
25mod migrations;
26
27/// LDK Node's database file name.
28pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite";
29/// LDK Node's table in which we store all data.
30pub const KV_TABLE_NAME: &str = "ldk_node_data";
31
32/// The default database file name.
33pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
34
35/// The default table in which we store all data.
36pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
37
38// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
39const SCHEMA_USER_VERSION: u16 = 2;
40
41/// A [`KVStoreSync`] implementation that writes to and reads from an [SQLite] database.
42///
43/// [SQLite]: https://sqlite.org
44pub struct SqliteStore {
45	inner: Arc<SqliteStoreInner>,
46
47	// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
48	// operations aren't sensitive to the order of execution.
49	next_write_version: AtomicU64,
50}
51
52impl SqliteStore {
53	/// Constructs a new [`SqliteStore`].
54	///
55	/// If not already existing, a new SQLite database will be created in the given `data_dir` under the
56	/// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`).
57	///
58	/// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`].
59	pub fn new(
60		data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
61	) -> io::Result<Self> {
62		let inner = Arc::new(SqliteStoreInner::new(data_dir, db_file_name, kv_table_name)?);
63		let next_write_version = AtomicU64::new(1);
64		Ok(Self { inner, next_write_version })
65	}
66
67	fn build_locking_key(
68		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
69	) -> String {
70		format!("{}#{}#{}", primary_namespace, secondary_namespace, key)
71	}
72
73	fn get_new_version_and_lock_ref(&self, locking_key: String) -> (Arc<Mutex<u64>>, u64) {
74		let version = self.next_write_version.fetch_add(1, Ordering::Relaxed);
75		if version == u64::MAX {
76			panic!("SqliteStore version counter overflowed");
77		}
78
79		// Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
80		// cleaning up unused locks.
81		let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key);
82
83		(inner_lock_ref, version)
84	}
85
86	/// Returns the data directory.
87	pub fn get_data_dir(&self) -> PathBuf {
88		self.inner.data_dir.clone()
89	}
90}
91
92impl KVStore for SqliteStore {
93	fn read(
94		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
95	) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + Send>> {
96		let primary_namespace = primary_namespace.to_string();
97		let secondary_namespace = secondary_namespace.to_string();
98		let key = key.to_string();
99		let inner = Arc::clone(&self.inner);
100		let fut = tokio::task::spawn_blocking(move || {
101			inner.read_internal(&primary_namespace, &secondary_namespace, &key)
102		});
103		Box::pin(async move {
104			fut.await.unwrap_or_else(|e| {
105				let msg = format!("Failed to IO operation due join error: {}", e);
106				Err(io::Error::new(io::ErrorKind::Other, msg))
107			})
108		})
109	}
110
111	fn write(
112		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
113	) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
114		let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
115		let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
116		let primary_namespace = primary_namespace.to_string();
117		let secondary_namespace = secondary_namespace.to_string();
118		let key = key.to_string();
119		let inner = Arc::clone(&self.inner);
120		let fut = tokio::task::spawn_blocking(move || {
121			inner.write_internal(
122				inner_lock_ref,
123				locking_key,
124				version,
125				&primary_namespace,
126				&secondary_namespace,
127				&key,
128				buf,
129			)
130		});
131		Box::pin(async move {
132			fut.await.unwrap_or_else(|e| {
133				let msg = format!("Failed to IO operation due join error: {}", e);
134				Err(io::Error::new(io::ErrorKind::Other, msg))
135			})
136		})
137	}
138
139	fn remove(
140		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
141	) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
142		let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
143		let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
144		let primary_namespace = primary_namespace.to_string();
145		let secondary_namespace = secondary_namespace.to_string();
146		let key = key.to_string();
147		let inner = Arc::clone(&self.inner);
148		let fut = tokio::task::spawn_blocking(move || {
149			inner.remove_internal(
150				inner_lock_ref,
151				locking_key,
152				version,
153				&primary_namespace,
154				&secondary_namespace,
155				&key,
156			)
157		});
158		Box::pin(async move {
159			fut.await.unwrap_or_else(|e| {
160				let msg = format!("Failed to IO operation due join error: {}", e);
161				Err(io::Error::new(io::ErrorKind::Other, msg))
162			})
163		})
164	}
165
166	fn list(
167		&self, primary_namespace: &str, secondary_namespace: &str,
168	) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + Send>> {
169		let primary_namespace = primary_namespace.to_string();
170		let secondary_namespace = secondary_namespace.to_string();
171		let inner = Arc::clone(&self.inner);
172		let fut = tokio::task::spawn_blocking(move || {
173			inner.list_internal(&primary_namespace, &secondary_namespace)
174		});
175		Box::pin(async move {
176			fut.await.unwrap_or_else(|e| {
177				let msg = format!("Failed to IO operation due join error: {}", e);
178				Err(io::Error::new(io::ErrorKind::Other, msg))
179			})
180		})
181	}
182}
183
184impl KVStoreSync for SqliteStore {
185	fn read(
186		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
187	) -> io::Result<Vec<u8>> {
188		self.inner.read_internal(primary_namespace, secondary_namespace, key)
189	}
190
191	fn write(
192		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
193	) -> io::Result<()> {
194		let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
195		let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
196		self.inner.write_internal(
197			inner_lock_ref,
198			locking_key,
199			version,
200			primary_namespace,
201			secondary_namespace,
202			key,
203			buf,
204		)
205	}
206
207	fn remove(
208		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
209	) -> io::Result<()> {
210		let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
211		let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
212		self.inner.remove_internal(
213			inner_lock_ref,
214			locking_key,
215			version,
216			primary_namespace,
217			secondary_namespace,
218			key,
219		)
220	}
221
222	fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
223		self.inner.list_internal(primary_namespace, secondary_namespace)
224	}
225}
226
227struct SqliteStoreInner {
228	connection: Arc<Mutex<Connection>>,
229	data_dir: PathBuf,
230	kv_table_name: String,
231	write_version_locks: Mutex<HashMap<String, Arc<Mutex<u64>>>>,
232}
233
234impl SqliteStoreInner {
235	fn new(
236		data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
237	) -> io::Result<Self> {
238		let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
239		let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
240
241		fs::create_dir_all(data_dir.clone()).map_err(|e| {
242			let msg = format!(
243				"Failed to create database destination directory {}: {}",
244				data_dir.display(),
245				e
246			);
247			io::Error::new(io::ErrorKind::Other, msg)
248		})?;
249		let mut db_file_path = data_dir.clone();
250		db_file_path.push(db_file_name);
251
252		let mut connection = Connection::open(db_file_path.clone()).map_err(|e| {
253			let msg =
254				format!("Failed to open/create database file {}: {}", db_file_path.display(), e);
255			io::Error::new(io::ErrorKind::Other, msg)
256		})?;
257
258		let sql = format!("SELECT user_version FROM pragma_user_version");
259		let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap();
260
261		if version_res == 0 {
262			// New database, set our SCHEMA_USER_VERSION and continue
263			connection
264				.pragma(
265					Some(rusqlite::DatabaseName::Main),
266					"user_version",
267					SCHEMA_USER_VERSION,
268					|_| Ok(()),
269				)
270				.map_err(|e| {
271					let msg = format!("Failed to set PRAGMA user_version: {}", e);
272					io::Error::new(io::ErrorKind::Other, msg)
273				})?;
274		} else if version_res < SCHEMA_USER_VERSION {
275			migrations::migrate_schema(
276				&mut connection,
277				&kv_table_name,
278				version_res,
279				SCHEMA_USER_VERSION,
280			)?;
281		} else if version_res > SCHEMA_USER_VERSION {
282			let msg = format!(
283				"Failed to open database: incompatible schema version {}. Expected: {}",
284				version_res, SCHEMA_USER_VERSION
285			);
286			return Err(io::Error::new(io::ErrorKind::Other, msg));
287		}
288
289		let sql = format!(
290			"CREATE TABLE IF NOT EXISTS {} (
291			primary_namespace TEXT NOT NULL,
292			secondary_namespace TEXT DEFAULT \"\" NOT NULL,
293			key TEXT NOT NULL CHECK (key <> ''),
294			value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
295			);",
296			kv_table_name
297		);
298
299		connection.execute(&sql, []).map_err(|e| {
300			let msg = format!("Failed to create table {}: {}", kv_table_name, e);
301			io::Error::new(io::ErrorKind::Other, msg)
302		})?;
303
304		let connection = Arc::new(Mutex::new(connection));
305		let write_version_locks = Mutex::new(HashMap::new());
306		Ok(Self { connection, data_dir, kv_table_name, write_version_locks })
307	}
308
309	fn get_inner_lock_ref(&self, locking_key: String) -> Arc<Mutex<u64>> {
310		let mut outer_lock = self.write_version_locks.lock().unwrap();
311		Arc::clone(&outer_lock.entry(locking_key).or_default())
312	}
313
314	fn read_internal(
315		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
316	) -> io::Result<Vec<u8>> {
317		check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
318
319		let locked_conn = self.connection.lock().unwrap();
320		let sql =
321			format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;",
322			self.kv_table_name);
323
324		let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
325			let msg = format!("Failed to prepare statement: {}", e);
326			io::Error::new(io::ErrorKind::Other, msg)
327		})?;
328
329		let res = stmt
330			.query_row(
331				named_params! {
332					":primary_namespace": primary_namespace,
333					":secondary_namespace": secondary_namespace,
334					":key": key,
335				},
336				|row| row.get(0),
337			)
338			.map_err(|e| match e {
339				rusqlite::Error::QueryReturnedNoRows => {
340					let msg = format!(
341						"Failed to read as key could not be found: {}/{}/{}",
342						PrintableString(primary_namespace),
343						PrintableString(secondary_namespace),
344						PrintableString(key)
345					);
346					io::Error::new(io::ErrorKind::NotFound, msg)
347				},
348				e => {
349					let msg = format!(
350						"Failed to read from key {}/{}/{}: {}",
351						PrintableString(primary_namespace),
352						PrintableString(secondary_namespace),
353						PrintableString(key),
354						e
355					);
356					io::Error::new(io::ErrorKind::Other, msg)
357				},
358			})?;
359		Ok(res)
360	}
361
362	fn write_internal(
363		&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
364		primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
365	) -> io::Result<()> {
366		check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
367
368		self.execute_locked_write(inner_lock_ref, locking_key, version, || {
369			let locked_conn = self.connection.lock().unwrap();
370
371			let sql = format!(
372				"INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
373				self.kv_table_name
374			);
375
376			let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
377				let msg = format!("Failed to prepare statement: {}", e);
378				io::Error::new(io::ErrorKind::Other, msg)
379			})?;
380
381			stmt.execute(named_params! {
382				":primary_namespace": primary_namespace,
383				":secondary_namespace": secondary_namespace,
384				":key": key,
385				":value": buf,
386			})
387			.map(|_| ())
388			.map_err(|e| {
389				let msg = format!(
390					"Failed to write to key {}/{}/{}: {}",
391					PrintableString(primary_namespace),
392					PrintableString(secondary_namespace),
393					PrintableString(key),
394					e
395				);
396				io::Error::new(io::ErrorKind::Other, msg)
397			})
398		})
399	}
400
401	fn remove_internal(
402		&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
403		primary_namespace: &str, secondary_namespace: &str, key: &str,
404	) -> io::Result<()> {
405		check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
406
407		self.execute_locked_write(inner_lock_ref, locking_key, version, || {
408			let locked_conn = self.connection.lock().unwrap();
409
410			let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
411
412			let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
413				let msg = format!("Failed to prepare statement: {}", e);
414				io::Error::new(io::ErrorKind::Other, msg)
415			})?;
416
417			stmt.execute(named_params! {
418				":primary_namespace": primary_namespace,
419				":secondary_namespace": secondary_namespace,
420				":key": key,
421			})
422			.map_err(|e| {
423				let msg = format!(
424					"Failed to delete key {}/{}/{}: {}",
425					PrintableString(primary_namespace),
426					PrintableString(secondary_namespace),
427					PrintableString(key),
428					e
429				);
430				io::Error::new(io::ErrorKind::Other, msg)
431			})?;
432			Ok(())
433		})
434	}
435
436	fn list_internal(
437		&self, primary_namespace: &str, secondary_namespace: &str,
438	) -> io::Result<Vec<String>> {
439		check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
440
441		let locked_conn = self.connection.lock().unwrap();
442
443		let sql = format!(
444			"SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace",
445			self.kv_table_name
446		);
447		let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
448			let msg = format!("Failed to prepare statement: {}", e);
449			io::Error::new(io::ErrorKind::Other, msg)
450		})?;
451
452		let mut keys = Vec::new();
453
454		let rows_iter = stmt
455			.query_map(
456				named_params! {
457						":primary_namespace": primary_namespace,
458						":secondary_namespace": secondary_namespace,
459				},
460				|row| row.get(0),
461			)
462			.map_err(|e| {
463				let msg = format!("Failed to retrieve queried rows: {}", e);
464				io::Error::new(io::ErrorKind::Other, msg)
465			})?;
466
467		for k in rows_iter {
468			keys.push(k.map_err(|e| {
469				let msg = format!("Failed to retrieve queried rows: {}", e);
470				io::Error::new(io::ErrorKind::Other, msg)
471			})?);
472		}
473
474		Ok(keys)
475	}
476
477	fn execute_locked_write<F: FnOnce() -> Result<(), lightning::io::Error>>(
478		&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64, callback: F,
479	) -> Result<(), lightning::io::Error> {
480		let res = {
481			let mut last_written_version = inner_lock_ref.lock().unwrap();
482
483			// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
484			// consistency.
485			let is_stale_version = version <= *last_written_version;
486
487			// If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
488			if is_stale_version {
489				Ok(())
490			} else {
491				callback().map(|_| {
492					*last_written_version = version;
493				})
494			}
495		};
496
497		self.clean_locks(&inner_lock_ref, locking_key);
498
499		res
500	}
501
502	fn clean_locks(&self, inner_lock_ref: &Arc<Mutex<u64>>, locking_key: String) {
503		// If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
504		// to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
505		// inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
506		// counted.
507		let mut outer_lock = self.write_version_locks.lock().unwrap();
508
509		let strong_count = Arc::strong_count(&inner_lock_ref);
510		debug_assert!(strong_count >= 2, "Unexpected SqliteStore strong count");
511
512		if strong_count == 2 {
513			outer_lock.remove(&locking_key);
514		}
515	}
516}
517
518#[cfg(test)]
519mod tests {
520	use super::*;
521	use crate::io::test_utils::{
522		do_read_write_remove_list_persist, do_test_store, random_storage_path,
523	};
524
525	impl Drop for SqliteStore {
526		fn drop(&mut self) {
527			match fs::remove_dir_all(&self.inner.data_dir) {
528				Err(e) => println!("Failed to remove test store directory: {}", e),
529				_ => {},
530			}
531		}
532	}
533
534	#[test]
535	fn read_write_remove_list_persist() {
536		let mut temp_path = random_storage_path();
537		temp_path.push("read_write_remove_list_persist");
538		let store = SqliteStore::new(
539			temp_path,
540			Some("test_db".to_string()),
541			Some("test_table".to_string()),
542		)
543		.unwrap();
544		do_read_write_remove_list_persist(&store);
545	}
546
547	#[test]
548	fn test_sqlite_store() {
549		let mut temp_path = random_storage_path();
550		temp_path.push("test_sqlite_store");
551		let store_0 = SqliteStore::new(
552			temp_path.clone(),
553			Some("test_db_0".to_string()),
554			Some("test_table".to_string()),
555		)
556		.unwrap();
557		let store_1 = SqliteStore::new(
558			temp_path,
559			Some("test_db_1".to_string()),
560			Some("test_table".to_string()),
561		)
562		.unwrap();
563		do_test_store(&store_0, &store_1)
564	}
565}
566
567#[cfg(ldk_bench)]
568/// Benches
569pub mod bench {
570	use criterion::Criterion;
571
572	/// Bench!
573	pub fn bench_sends(bench: &mut Criterion) {
574		let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap();
575		let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap();
576		lightning::ln::channelmanager::bench::bench_two_sends(
577			bench,
578			"bench_sqlite_persisted_sends",
579			store_a,
580			store_b,
581		);
582	}
583}