1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#[macro_use] extern crate lightning;

use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger::Logger;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// responsibilities are:
/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so,
///   writing it to disk/backups by invoking the callback given to it at startup.
///   ChannelManager persistence should be done in the background.
/// * Calling `ChannelManager::timer_chan_freshness_every_min()` every minute (can be done in the
///   background).
///
/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date,
/// then there is a risk of channels force-closing on startup when the manager realizes it's
/// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used
/// for unilateral chain closure fees are at risk.
pub struct BackgroundProcessor {
	stop_thread: Arc<AtomicBool>,
	/// May be used to retrieve and handle the error if `BackgroundProcessor`'s thread
	/// exits due to an error while persisting.
	pub thread_handle: JoinHandle<Result<(), std::io::Error>>,
}

#[cfg(not(test))]
const CHAN_FRESHNESS_TIMER: u64 = 60;
#[cfg(test)]
const CHAN_FRESHNESS_TIMER: u64 = 1;

impl BackgroundProcessor {
	/// Start a background thread that takes care of responsibilities enumerated in the top-level
	/// documentation.
	///
	/// If `persist_manager` returns an error, then this thread will return said error (and `start()`
	/// will need to be called again to restart the `BackgroundProcessor`). Users should wait on
	/// [`thread_handle`]'s `join()` method to be able to tell if and when an error is returned, or
	/// implement `persist_manager` such that an error is never returned to the `BackgroundProcessor`
	///
	/// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading
	/// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`.
	/// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation.
	///
	/// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle
	/// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write
	/// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl
	pub fn start<PM, Signer, M, T, K, F, L>(persist_manager: PM, manager: Arc<ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>, logger: Arc<L>) -> Self
	where Signer: 'static + Sign,
	      M: 'static + chain::Watch<Signer>,
	      T: 'static + BroadcasterInterface,
	      K: 'static + KeysInterface<Signer=Signer>,
	      F: 'static + FeeEstimator,
	      L: 'static + Logger,
	      PM: 'static + Send + Fn(&ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>,
	{
		let stop_thread = Arc::new(AtomicBool::new(false));
		let stop_thread_clone = stop_thread.clone();
		let handle = thread::spawn(move || -> Result<(), std::io::Error> {
			let mut current_time = Instant::now();
			loop {
				let updates_available = manager.await_persistable_update_timeout(Duration::from_millis(100));
				if updates_available {
					persist_manager(&*manager)?;
				}
				// Exit the loop if the background processor was requested to stop.
				if stop_thread.load(Ordering::Acquire) == true {
					log_trace!(logger, "Terminating background processor.");
					return Ok(())
				}
				if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER {
					log_trace!(logger, "Calling manager's timer_chan_freshness_every_min");
					manager.timer_chan_freshness_every_min();
					current_time = Instant::now();
				}
			}
		});
		Self {
			stop_thread: stop_thread_clone,
			thread_handle: handle,
		}
	}

	/// Stop `BackgroundProcessor`'s thread.
	pub fn stop(self) -> Result<(), std::io::Error> {
		self.stop_thread.store(true, Ordering::Release);
		self.thread_handle.join().unwrap()
	}
}

#[cfg(test)]
mod tests {
	use bitcoin::blockdata::constants::genesis_block;
	use bitcoin::blockdata::transaction::{Transaction, TxOut};
	use bitcoin::network::constants::Network;
	use lightning::chain;
	use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
	use lightning::chain::chainmonitor;
	use lightning::chain::keysinterface::{Sign, InMemorySigner, KeysInterface, KeysManager};
	use lightning::chain::transaction::OutPoint;
	use lightning::get_event_msg;
	use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager};
	use lightning::ln::features::InitFeatures;
	use lightning::ln::msgs::ChannelMessageHandler;
	use lightning::util::config::UserConfig;
	use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
	use lightning::util::logger::Logger;
	use lightning::util::ser::Writeable;
	use lightning::util::test_utils;
	use lightning_persister::FilesystemPersister;
	use std::fs;
	use std::path::PathBuf;
	use std::sync::{Arc, Mutex};
	use std::time::Duration;
	use super::BackgroundProcessor;

	type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;

	struct Node {
		node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
		persister: Arc<FilesystemPersister>,
		logger: Arc<test_utils::TestLogger>,
	}

	impl Drop for Node {
		fn drop(&mut self) {
			let data_dir = self.persister.get_data_dir();
			match fs::remove_dir_all(data_dir.clone()) {
				Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
				_ => {}
			}
		}
	}

	fn get_full_filepath(filepath: String, filename: String) -> String {
		let mut path = PathBuf::from(filepath);
		path.push(filename);
		path.to_str().unwrap().to_string()
	}

	fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
		let mut nodes = Vec::new();
		for i in 0..num_nodes {
			let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
			let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
			let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
			let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
			let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
			let seed = [i as u8; 32];
			let network = Network::Testnet;
			let genesis_block = genesis_block(network);
			let now = Duration::from_secs(genesis_block.header.time as u64);
			let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
			let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
			let params = ChainParameters {
				network,
				latest_hash: genesis_block.block_hash(),
				latest_height: 0,
			};
			let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params));
			let node = Node { node: manager, persister, logger };
			nodes.push(node);
		}
		nodes
	}

	macro_rules! open_channel {
		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
			$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
			$node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
			$node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
			let events = $node_a.node.get_and_clear_pending_events();
			assert_eq!(events.len(), 1);
			let (temporary_channel_id, tx, funding_output) = match events[0] {
				Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => {
					assert_eq!(*channel_value_satoshis, $channel_value);
					assert_eq!(user_channel_id, 42);

					let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
						value: *channel_value_satoshis, script_pubkey: output_script.clone(),
					}]};
					let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 };
					(*temporary_channel_id, tx, funding_outpoint)
				},
				_ => panic!("Unexpected event"),
			};

			$node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output);
			$node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
			$node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
			tx
		}}
	}

	#[test]
	fn test_background_processor() {
		// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
		// updates. Also test that when new updates are available, the manager signals that it needs
		// re-persistence and is successfully re-persisted.
		let nodes = create_nodes(2, "test_background_processor".to_string());

		// Initiate the background processors to watch each node.
		let data_dir = nodes[0].persister.get_data_dir();
		let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
		let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());

		// Go through the channel creation process until each node should have something persisted.
		let tx = open_channel!(nodes[0], nodes[1], 100000);

		macro_rules! check_persisted_data {
			($node: expr, $filepath: expr, $expected_bytes: expr) => {
				match $node.write(&mut $expected_bytes) {
					Ok(()) => {
						loop {
							match std::fs::read($filepath) {
								Ok(bytes) => {
									if bytes == $expected_bytes {
										break
									} else {
										continue
									}
								},
								Err(_) => continue
							}
						}
					},
					Err(e) => panic!("Unexpected error: {}", e)
				}
			}
		}

		// Check that the initial channel manager data is persisted as expected.
		let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
		let mut expected_bytes = Vec::new();
		check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
		loop {
			if !nodes[0].node.get_persistence_condvar_value() { break }
		}

		// Force-close the channel.
		nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()).unwrap();

		// Check that the force-close updates are persisted.
		let mut expected_bytes = Vec::new();
		check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
		loop {
			if !nodes[0].node.get_persistence_condvar_value() { break }
		}

		assert!(bg_processor.stop().is_ok());
	}

	#[test]
	fn test_chan_freshness_called() {
		// Test that ChannelManager's `timer_chan_freshness_every_min` is called every
		// `CHAN_FRESHNESS_TIMER`.
		let nodes = create_nodes(1, "test_chan_freshness_called".to_string());
		let data_dir = nodes[0].persister.get_data_dir();
		let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
		let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
		loop {
			let log_entries = nodes[0].logger.lines.lock().unwrap();
			let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string();
			if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() {
				break
			}
		}

		assert!(bg_processor.stop().is_ok());
	}

	#[test]
	fn test_persist_error() {
		// Test that if we encounter an error during manager persistence, the thread panics.
		fn persist_manager<Signer, M, T, K, F, L>(_data: &ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>
		where Signer: 'static + Sign,
		      M: 'static + chain::Watch<Signer>,
		      T: 'static + BroadcasterInterface,
		      K: 'static + KeysInterface<Signer=Signer>,
		      F: 'static + FeeEstimator,
		      L: 'static + Logger,
		{
			Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
		}

		let nodes = create_nodes(2, "test_persist_error".to_string());
		let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone());
		open_channel!(nodes[0], nodes[1], 100000);

		let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test");
	}
}