ldk_node/io/sqlite_store/
mod.rs

1// This file is Copyright its original authors, visible in version control history.
2//
3// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6// accordance with one or both of these licenses.
7
8//! Objects related to [`SqliteStore`] live here.
9use crate::io::utils::check_namespace_key_validity;
10
11use lightning::io;
12use lightning::util::persist::KVStore;
13use lightning::util::string::PrintableString;
14
15use rusqlite::{named_params, Connection};
16
17use std::fs;
18use std::path::PathBuf;
19use std::sync::{Arc, Mutex};
20
21mod migrations;
22
23/// LDK Node's database file name.
24pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite";
25/// LDK Node's table in which we store all data.
26pub const KV_TABLE_NAME: &str = "ldk_node_data";
27
28/// The default database file name.
29pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
30
31/// The default table in which we store all data.
32pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
33
34// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
35const SCHEMA_USER_VERSION: u16 = 2;
36
37/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
38///
39/// [SQLite]: https://sqlite.org
40pub struct SqliteStore {
41	connection: Arc<Mutex<Connection>>,
42	data_dir: PathBuf,
43	kv_table_name: String,
44}
45
46impl SqliteStore {
47	/// Constructs a new [`SqliteStore`].
48	///
49	/// If not already existing, a new SQLite database will be created in the given `data_dir` under the
50	/// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`).
51	///
52	/// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`].
53	pub fn new(
54		data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
55	) -> io::Result<Self> {
56		let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
57		let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
58
59		fs::create_dir_all(data_dir.clone()).map_err(|e| {
60			let msg = format!(
61				"Failed to create database destination directory {}: {}",
62				data_dir.display(),
63				e
64			);
65			io::Error::new(io::ErrorKind::Other, msg)
66		})?;
67		let mut db_file_path = data_dir.clone();
68		db_file_path.push(db_file_name);
69
70		let mut connection = Connection::open(db_file_path.clone()).map_err(|e| {
71			let msg =
72				format!("Failed to open/create database file {}: {}", db_file_path.display(), e);
73			io::Error::new(io::ErrorKind::Other, msg)
74		})?;
75
76		let sql = format!("SELECT user_version FROM pragma_user_version");
77		let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap();
78
79		if version_res == 0 {
80			// New database, set our SCHEMA_USER_VERSION and continue
81			connection
82				.pragma(
83					Some(rusqlite::DatabaseName::Main),
84					"user_version",
85					SCHEMA_USER_VERSION,
86					|_| Ok(()),
87				)
88				.map_err(|e| {
89					let msg = format!("Failed to set PRAGMA user_version: {}", e);
90					io::Error::new(io::ErrorKind::Other, msg)
91				})?;
92		} else if version_res < SCHEMA_USER_VERSION {
93			migrations::migrate_schema(
94				&mut connection,
95				&kv_table_name,
96				version_res,
97				SCHEMA_USER_VERSION,
98			)?;
99		} else if version_res > SCHEMA_USER_VERSION {
100			let msg = format!(
101				"Failed to open database: incompatible schema version {}. Expected: {}",
102				version_res, SCHEMA_USER_VERSION
103			);
104			return Err(io::Error::new(io::ErrorKind::Other, msg));
105		}
106
107		let sql = format!(
108			"CREATE TABLE IF NOT EXISTS {} (
109			primary_namespace TEXT NOT NULL,
110			secondary_namespace TEXT DEFAULT \"\" NOT NULL,
111			key TEXT NOT NULL CHECK (key <> ''),
112			value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
113			);",
114			kv_table_name
115		);
116
117		connection.execute(&sql, []).map_err(|e| {
118			let msg = format!("Failed to create table {}: {}", kv_table_name, e);
119			io::Error::new(io::ErrorKind::Other, msg)
120		})?;
121
122		let connection = Arc::new(Mutex::new(connection));
123		Ok(Self { connection, data_dir, kv_table_name })
124	}
125
126	/// Returns the data directory.
127	pub fn get_data_dir(&self) -> PathBuf {
128		self.data_dir.clone()
129	}
130}
131
132impl KVStore for SqliteStore {
133	fn read(
134		&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
135	) -> io::Result<Vec<u8>> {
136		check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
137
138		let locked_conn = self.connection.lock().unwrap();
139		let sql =
140			format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;",
141			self.kv_table_name);
142
143		let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
144			let msg = format!("Failed to prepare statement: {}", e);
145			io::Error::new(io::ErrorKind::Other, msg)
146		})?;
147
148		let res = stmt
149			.query_row(
150				named_params! {
151					":primary_namespace": primary_namespace,
152					":secondary_namespace": secondary_namespace,
153					":key": key,
154				},
155				|row| row.get(0),
156			)
157			.map_err(|e| match e {
158				rusqlite::Error::QueryReturnedNoRows => {
159					let msg = format!(
160						"Failed to read as key could not be found: {}/{}/{}",
161						PrintableString(primary_namespace),
162						PrintableString(secondary_namespace),
163						PrintableString(key)
164					);
165					io::Error::new(io::ErrorKind::NotFound, msg)
166				},
167				e => {
168					let msg = format!(
169						"Failed to read from key {}/{}/{}: {}",
170						PrintableString(primary_namespace),
171						PrintableString(secondary_namespace),
172						PrintableString(key),
173						e
174					);
175					io::Error::new(io::ErrorKind::Other, msg)
176				},
177			})?;
178		Ok(res)
179	}
180
181	fn write(
182		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
183	) -> io::Result<()> {
184		check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
185
186		let locked_conn = self.connection.lock().unwrap();
187
188		let sql = format!(
189			"INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
190			self.kv_table_name
191		);
192
193		let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
194			let msg = format!("Failed to prepare statement: {}", e);
195			io::Error::new(io::ErrorKind::Other, msg)
196		})?;
197
198		stmt.execute(named_params! {
199			":primary_namespace": primary_namespace,
200			":secondary_namespace": secondary_namespace,
201			":key": key,
202			":value": buf,
203		})
204		.map(|_| ())
205		.map_err(|e| {
206			let msg = format!(
207				"Failed to write to key {}/{}/{}: {}",
208				PrintableString(primary_namespace),
209				PrintableString(secondary_namespace),
210				PrintableString(key),
211				e
212			);
213			io::Error::new(io::ErrorKind::Other, msg)
214		})
215	}
216
217	fn remove(
218		&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
219	) -> io::Result<()> {
220		check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
221
222		let locked_conn = self.connection.lock().unwrap();
223
224		let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
225
226		let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
227			let msg = format!("Failed to prepare statement: {}", e);
228			io::Error::new(io::ErrorKind::Other, msg)
229		})?;
230
231		stmt.execute(named_params! {
232			":primary_namespace": primary_namespace,
233			":secondary_namespace": secondary_namespace,
234			":key": key,
235		})
236		.map_err(|e| {
237			let msg = format!(
238				"Failed to delete key {}/{}/{}: {}",
239				PrintableString(primary_namespace),
240				PrintableString(secondary_namespace),
241				PrintableString(key),
242				e
243			);
244			io::Error::new(io::ErrorKind::Other, msg)
245		})?;
246		Ok(())
247	}
248
249	fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
250		check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
251
252		let locked_conn = self.connection.lock().unwrap();
253
254		let sql = format!(
255			"SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace",
256			self.kv_table_name
257		);
258		let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
259			let msg = format!("Failed to prepare statement: {}", e);
260			io::Error::new(io::ErrorKind::Other, msg)
261		})?;
262
263		let mut keys = Vec::new();
264
265		let rows_iter = stmt
266			.query_map(
267				named_params! {
268						":primary_namespace": primary_namespace,
269						":secondary_namespace": secondary_namespace,
270				},
271				|row| row.get(0),
272			)
273			.map_err(|e| {
274				let msg = format!("Failed to retrieve queried rows: {}", e);
275				io::Error::new(io::ErrorKind::Other, msg)
276			})?;
277
278		for k in rows_iter {
279			keys.push(k.map_err(|e| {
280				let msg = format!("Failed to retrieve queried rows: {}", e);
281				io::Error::new(io::ErrorKind::Other, msg)
282			})?);
283		}
284
285		Ok(keys)
286	}
287}
288
289#[cfg(test)]
290mod tests {
291	use super::*;
292	use crate::io::test_utils::{
293		do_read_write_remove_list_persist, do_test_store, random_storage_path,
294	};
295
296	impl Drop for SqliteStore {
297		fn drop(&mut self) {
298			match fs::remove_dir_all(&self.data_dir) {
299				Err(e) => println!("Failed to remove test store directory: {}", e),
300				_ => {},
301			}
302		}
303	}
304
305	#[test]
306	fn read_write_remove_list_persist() {
307		let mut temp_path = random_storage_path();
308		temp_path.push("read_write_remove_list_persist");
309		let store = SqliteStore::new(
310			temp_path,
311			Some("test_db".to_string()),
312			Some("test_table".to_string()),
313		)
314		.unwrap();
315		do_read_write_remove_list_persist(&store);
316	}
317
318	#[test]
319	fn test_sqlite_store() {
320		let mut temp_path = random_storage_path();
321		temp_path.push("test_sqlite_store");
322		let store_0 = SqliteStore::new(
323			temp_path.clone(),
324			Some("test_db_0".to_string()),
325			Some("test_table".to_string()),
326		)
327		.unwrap();
328		let store_1 = SqliteStore::new(
329			temp_path,
330			Some("test_db_1".to_string()),
331			Some("test_table".to_string()),
332		)
333		.unwrap();
334		do_test_store(&store_0, &store_1)
335	}
336}
337
338#[cfg(ldk_bench)]
339/// Benches
340pub mod bench {
341	use criterion::Criterion;
342
343	/// Bench!
344	pub fn bench_sends(bench: &mut Criterion) {
345		let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap();
346		let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap();
347		lightning::ln::channelmanager::bench::bench_two_sends(
348			bench,
349			"bench_sqlite_persisted_sends",
350			store_a,
351			store_b,
352		);
353	}
354}