aigc_chain/
chain.rs

1// Copyright 2021 The Aigc Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Facade and handler for the rest of the blockchain implementation
16//! and mostly the chain pipeline.
17
18use crate::core::core::merkle_proof::MerkleProof;
19use crate::core::core::{
20	Block, BlockHeader, BlockSums, Committed, Inputs, KernelFeatures, Output, OutputIdentifier,
21	Transaction, TxKernel,
22};
23use crate::core::global;
24use crate::core::pow;
25use crate::core::ser::ProtocolVersion;
26use crate::error::{Error, ErrorKind};
27use crate::pipe;
28use crate::store;
29use crate::txhashset;
30use crate::txhashset::{Desegmenter, PMMRHandle, Segmenter, TxHashSet};
31use crate::types::{
32	BlockStatus, ChainAdapter, CommitPos, NoStatus, Options, Tip, TxHashsetWriteStatus,
33};
34use crate::util::secp::pedersen::{Commitment, RangeProof};
35use crate::util::RwLock;
36use crate::{
37	core::core::hash::{Hash, Hashed},
38	store::Batch,
39	txhashset::{ExtensionPair, HeaderExtension},
40};
41use aigc_store::Error::NotFoundErr;
42use std::collections::HashMap;
43use std::fs::{self, File};
44use std::path::{Path, PathBuf};
45use std::sync::atomic::{AtomicUsize, Ordering};
46use std::sync::Arc;
47use std::time::{Duration, Instant};
48
49/// Orphan pool size is limited by MAX_ORPHAN_SIZE
50pub const MAX_ORPHAN_SIZE: usize = 200;
51
52/// When evicting, very old orphans are evicted first
53const MAX_ORPHAN_AGE_SECS: u64 = 300;
54
55#[derive(Debug, Clone)]
56struct Orphan {
57	block: Block,
58	opts: Options,
59	added: Instant,
60}
61
62pub struct OrphanBlockPool {
63	// blocks indexed by their hash
64	orphans: RwLock<HashMap<Hash, Orphan>>,
65	// additional index of height -> hash
66	// so we can efficiently identify a child block (ex-orphan) after processing a block
67	height_idx: RwLock<HashMap<u64, Vec<Hash>>>,
68	// accumulated number of evicted block because of MAX_ORPHAN_SIZE limitation
69	evicted: AtomicUsize,
70}
71
72impl OrphanBlockPool {
73	fn new() -> OrphanBlockPool {
74		OrphanBlockPool {
75			orphans: RwLock::new(HashMap::new()),
76			height_idx: RwLock::new(HashMap::new()),
77			evicted: AtomicUsize::new(0),
78		}
79	}
80
81	fn len(&self) -> usize {
82		let orphans = self.orphans.read();
83		orphans.len()
84	}
85
86	fn len_evicted(&self) -> usize {
87		self.evicted.load(Ordering::Relaxed)
88	}
89
90	fn add(&self, orphan: Orphan) {
91		let mut orphans = self.orphans.write();
92		let mut height_idx = self.height_idx.write();
93		{
94			let height_hashes = height_idx
95				.entry(orphan.block.header.height)
96				.or_insert_with(|| vec![]);
97			height_hashes.push(orphan.block.hash());
98			orphans.insert(orphan.block.hash(), orphan);
99		}
100
101		if orphans.len() > MAX_ORPHAN_SIZE {
102			let old_len = orphans.len();
103
104			// evict too old
105			orphans.retain(|_, ref mut x| {
106				x.added.elapsed() < Duration::from_secs(MAX_ORPHAN_AGE_SECS)
107			});
108			// evict too far ahead
109			let mut heights = height_idx.keys().cloned().collect::<Vec<u64>>();
110			heights.sort_unstable();
111			for h in heights.iter().rev() {
112				if let Some(hs) = height_idx.remove(h) {
113					for h in hs {
114						let _ = orphans.remove(&h);
115					}
116				}
117				if orphans.len() < MAX_ORPHAN_SIZE {
118					break;
119				}
120			}
121			// cleanup index
122			height_idx.retain(|_, ref mut xs| xs.iter().any(|x| orphans.contains_key(&x)));
123
124			self.evicted
125				.fetch_add(old_len - orphans.len(), Ordering::Relaxed);
126		}
127	}
128
129	/// Get an orphan from the pool indexed by the hash of its parent, removing
130	/// it at the same time, preventing clone
131	fn remove_by_height(&self, height: u64) -> Option<Vec<Orphan>> {
132		let mut orphans = self.orphans.write();
133		let mut height_idx = self.height_idx.write();
134		height_idx
135			.remove(&height)
136			.map(|hs| hs.iter().filter_map(|h| orphans.remove(h)).collect())
137	}
138
139	pub fn contains(&self, hash: &Hash) -> bool {
140		let orphans = self.orphans.read();
141		orphans.contains_key(hash)
142	}
143}
144
145/// Facade to the blockchain block processing pipeline and storage. Provides
146/// the current view of the TxHashSet according to the chain state. Also
147/// maintains locking for the pipeline to avoid conflicting processing.
148pub struct Chain {
149	db_root: String,
150	store: Arc<store::ChainStore>,
151	adapter: Arc<dyn ChainAdapter + Send + Sync>,
152	orphans: Arc<OrphanBlockPool>,
153	txhashset: Arc<RwLock<txhashset::TxHashSet>>,
154	header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
155	pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
156	pibd_desegmenter: Arc<RwLock<Option<Desegmenter>>>,
157	// POW verification function
158	pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
159	denylist: Arc<RwLock<Vec<Hash>>>,
160	archive_mode: bool,
161	genesis: BlockHeader,
162}
163
164impl Chain {
165	/// Initializes the blockchain and returns a new Chain instance. Does a
166	/// check on the current chain head to make sure it exists and creates one
167	/// based on the genesis block if necessary.
168	pub fn init(
169		db_root: String,
170		adapter: Arc<dyn ChainAdapter + Send + Sync>,
171		genesis: Block,
172		pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
173		archive_mode: bool,
174	) -> Result<Chain, Error> {
175		let store = Arc::new(store::ChainStore::new(&db_root)?);
176
177		// open the txhashset, creating a new one if necessary
178		let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;
179
180		let mut header_pmmr = PMMRHandle::new(
181			Path::new(&db_root).join("header").join("header_head"),
182			false,
183			ProtocolVersion(1),
184			None,
185		)?;
186
187		setup_head(&genesis, &store, &mut header_pmmr, &mut txhashset)?;
188
189		// Initialize the output_pos index based on UTXO set
190		// and NRD kernel_pos index based recent kernel history.
191		{
192			let batch = store.batch()?;
193			txhashset.init_output_pos_index(&header_pmmr, &batch)?;
194			txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;
195			batch.commit()?;
196		}
197
198		let chain = Chain {
199			db_root,
200			store,
201			adapter,
202			orphans: Arc::new(OrphanBlockPool::new()),
203			txhashset: Arc::new(RwLock::new(txhashset)),
204			header_pmmr: Arc::new(RwLock::new(header_pmmr)),
205			pibd_segmenter: Arc::new(RwLock::new(None)),
206			pibd_desegmenter: Arc::new(RwLock::new(None)),
207			pow_verifier,
208			denylist: Arc::new(RwLock::new(vec![])),
209			archive_mode,
210			genesis: genesis.header,
211		};
212
213		chain.log_heads()?;
214
215		Ok(chain)
216	}
217
218	/// Add provided header hash to our "denylist".
219	/// The header corresponding to any "denied" hash will be rejected
220	/// and the peer subsequently banned.
221	pub fn invalidate_header(&self, hash: Hash) -> Result<(), Error> {
222		self.denylist.write().push(hash);
223		Ok(())
224	}
225
226	/// Reset both head and header_head to the provided header.
227	/// Handles simple rewind and more complex fork scenarios.
228	/// Used by the reset_chain_head owner api endpoint.
229	pub fn reset_chain_head<T: Into<Tip>>(&self, head: T) -> Result<(), Error> {
230		let head = head.into();
231
232		let mut header_pmmr = self.header_pmmr.write();
233		let mut txhashset = self.txhashset.write();
234		let mut batch = self.store.batch()?;
235
236		let header = batch.get_block_header(&head.hash())?;
237
238		// Rewind and reapply blocks to reset the output/rangeproof/kernel MMR.
239		txhashset::extending(
240			&mut header_pmmr,
241			&mut txhashset,
242			&mut batch,
243			|ext, batch| {
244				self.rewind_and_apply_fork(&header, ext, batch)?;
245				batch.save_body_head(&head)?;
246				Ok(())
247			},
248		)?;
249
250		// If the rewind of full blocks was successful then we can rewind the header MMR.
251		// Rewind and reapply headers to reset the header MMR.
252		txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
253			self.rewind_and_apply_header_fork(&header, ext, batch)?;
254			batch.save_header_head(&head)?;
255			Ok(())
256		})?;
257
258		batch.commit()?;
259
260		Ok(())
261	}
262
263	/// Are we running with archive_mode enabled?
264	pub fn archive_mode(&self) -> bool {
265		self.archive_mode
266	}
267
268	/// Return our shared header MMR handle.
269	pub fn header_pmmr(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> {
270		self.header_pmmr.clone()
271	}
272
273	/// Return our shared txhashset instance.
274	pub fn txhashset(&self) -> Arc<RwLock<TxHashSet>> {
275		self.txhashset.clone()
276	}
277
278	/// Shared store instance.
279	pub fn store(&self) -> Arc<store::ChainStore> {
280		self.store.clone()
281	}
282
283	fn log_heads(&self) -> Result<(), Error> {
284		let log_head = |name, head: Tip| {
285			debug!(
286				"{}: {} @ {} [{}]",
287				name,
288				head.total_difficulty.to_num(),
289				head.height,
290				head.hash(),
291			);
292		};
293		log_head("head", self.head()?);
294		log_head("header_head", self.header_head()?);
295		Ok(())
296	}
297
298	/// Processes a single block, then checks for orphans, processing
299	/// those as well if they're found
300	pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
301		let height = b.header.height;
302		let res = self.process_block_single(b, opts);
303		if res.is_ok() {
304			self.check_orphans(height + 1);
305		}
306		res
307	}
308
309	fn determine_status(
310		&self,
311		head: Option<Tip>,
312		prev: Tip,
313		prev_head: Tip,
314		fork_point: Tip,
315	) -> BlockStatus {
316		// If head is updated then we are either "next" block or we just experienced a "reorg" to new head.
317		// Otherwise this is a "fork" off the main chain.
318		if let Some(head) = head {
319			if self.is_on_current_chain(prev_head, head).is_ok() {
320				BlockStatus::Next { prev }
321			} else {
322				BlockStatus::Reorg {
323					prev,
324					prev_head,
325					fork_point,
326				}
327			}
328		} else {
329			BlockStatus::Fork {
330				prev,
331				head: prev_head,
332				fork_point,
333			}
334		}
335	}
336
337	/// Quick check for "known" duplicate block up to and including current chain head.
338	/// Returns an error if this block is "known".
339	pub fn is_known(&self, header: &BlockHeader) -> Result<(), Error> {
340		let head = self.head()?;
341		if head.hash() == header.hash() {
342			return Err(ErrorKind::Unfit("duplicate block".into()).into());
343		}
344		if header.total_difficulty() <= head.total_difficulty {
345			if self.block_exists(header.hash())? {
346				return Err(ErrorKind::Unfit("duplicate block".into()).into());
347			}
348		}
349		Ok(())
350	}
351
352	// Check if the provided block is an orphan.
353	// If block is an orphan add it to our orphan block pool for deferred processing.
354	// If this is the "next" block immediately following current head then not an orphan.
355	// Or if we have the previous full block then not an orphan.
356	fn check_orphan(&self, block: &Block, opts: Options) -> Result<(), Error> {
357		let head = self.head()?;
358		let is_next = block.header.prev_hash == head.last_block_h;
359		if is_next || self.block_exists(block.header.prev_hash)? {
360			return Ok(());
361		}
362
363		let block_hash = block.hash();
364		let orphan = Orphan {
365			block: block.clone(),
366			opts,
367			added: Instant::now(),
368		};
369		self.orphans.add(orphan);
370
371		debug!(
372			"is_orphan: {:?}, # orphans {}{}",
373			block_hash,
374			self.orphans.len(),
375			if self.orphans.len_evicted() > 0 {
376				format!(", # evicted {}", self.orphans.len_evicted())
377			} else {
378				String::new()
379			},
380		);
381
382		Err(ErrorKind::Orphan.into())
383	}
384
385	/// Attempt to add a new block to the chain.
386	/// Returns true if it has been added to the longest chain
387	/// or false if it has added to a fork (or orphan?).
388	fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
389		// Process the header first.
390		// If invalid then fail early.
391		// If valid then continue with block processing with header_head committed to db etc.
392		self.process_block_header(&b.header, opts)?;
393
394		// Check if we already know about this full block.
395		self.is_known(&b.header)?;
396
397		// Check if this block is an orphan.
398		// Only do this once we know the header PoW is valid.
399		self.check_orphan(&b, opts)?;
400
401		let (head, fork_point, prev_head) = {
402			let mut header_pmmr = self.header_pmmr.write();
403			let mut txhashset = self.txhashset.write();
404			let batch = self.store.batch()?;
405			let prev_head = batch.head()?;
406			let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
407
408			let (head, fork_point) = pipe::process_block(&b, &mut ctx)?;
409
410			ctx.batch.commit()?;
411
412			// release the lock and let the batch go before post-processing
413			(head, fork_point, prev_head)
414		};
415
416		let prev = self.get_previous_header(&b.header)?;
417		let status = self.determine_status(
418			head,
419			Tip::from_header(&prev),
420			prev_head,
421			Tip::from_header(&fork_point),
422		);
423
424		// notifying other parts of the system of the update
425		self.adapter.block_accepted(&b, status, opts);
426
427		Ok(head)
428	}
429
430	/// Process a block header received during "header first" propagation.
431	/// Note: This will update header MMR and corresponding header_head
432	/// if total work increases (on the header chain).
433	pub fn process_block_header(&self, bh: &BlockHeader, opts: Options) -> Result<(), Error> {
434		let mut header_pmmr = self.header_pmmr.write();
435		let mut txhashset = self.txhashset.write();
436		let batch = self.store.batch()?;
437		let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
438		pipe::process_block_header(bh, &mut ctx)?;
439		ctx.batch.commit()?;
440		Ok(())
441	}
442
443	/// Attempt to add new headers to the header chain (or fork).
444	/// This is only ever used during sync and is based on sync_head.
445	/// We update header_head here if our total work increases.
446	/// Returns the new sync_head (may temporarily diverge from header_head when syncing a long fork).
447	pub fn sync_block_headers(
448		&self,
449		headers: &[BlockHeader],
450		sync_head: Tip,
451		opts: Options,
452	) -> Result<Option<Tip>, Error> {
453		let mut header_pmmr = self.header_pmmr.write();
454		let mut txhashset = self.txhashset.write();
455		let batch = self.store.batch()?;
456
457		// Sync the chunk of block headers, updating header_head if total work increases.
458		let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
459		let sync_head = pipe::process_block_headers(headers, sync_head, &mut ctx)?;
460		ctx.batch.commit()?;
461
462		Ok(sync_head)
463	}
464
465	/// Build a new block processing context.
466	pub fn new_ctx<'a>(
467		&self,
468		opts: Options,
469		batch: store::Batch<'a>,
470		header_pmmr: &'a mut txhashset::PMMRHandle<BlockHeader>,
471		txhashset: &'a mut txhashset::TxHashSet,
472	) -> Result<pipe::BlockContext<'a>, Error> {
473		let denylist = self.denylist.read().clone();
474		Ok(pipe::BlockContext {
475			opts,
476			pow_verifier: self.pow_verifier,
477			header_allowed: Box::new(move |header| {
478				pipe::validate_header_denylist(header, &denylist)
479			}),
480			header_pmmr,
481			txhashset,
482			batch,
483		})
484	}
485
486	/// Check if hash is for a known orphan.
487	pub fn is_orphan(&self, hash: &Hash) -> bool {
488		self.orphans.contains(hash)
489	}
490
491	/// Get the OrphanBlockPool accumulated evicted number of blocks
492	pub fn orphans_evicted_len(&self) -> usize {
493		self.orphans.len_evicted()
494	}
495
496	/// Check for orphans, once a block is successfully added
497	fn check_orphans(&self, mut height: u64) {
498		let initial_height = height;
499
500		// Is there an orphan in our orphans that we can now process?
501		loop {
502			trace!(
503				"check_orphans: at {}, # orphans {}",
504				height,
505				self.orphans.len(),
506			);
507
508			let mut orphan_accepted = false;
509			let mut height_accepted = height;
510
511			if let Some(orphans) = self.orphans.remove_by_height(height) {
512				let orphans_len = orphans.len();
513				for (i, orphan) in orphans.into_iter().enumerate() {
514					debug!(
515						"check_orphans: get block {} at {}{}",
516						orphan.block.hash(),
517						height,
518						if orphans_len > 1 {
519							format!(", no.{} of {} orphans", i, orphans_len)
520						} else {
521							String::new()
522						},
523					);
524					let height = orphan.block.header.height;
525					let res = self.process_block_single(orphan.block, orphan.opts);
526					if res.is_ok() {
527						orphan_accepted = true;
528						height_accepted = height;
529					}
530				}
531
532				if orphan_accepted {
533					// We accepted a block, so see if we can accept any orphans
534					height = height_accepted + 1;
535					continue;
536				}
537			}
538			break;
539		}
540
541		if initial_height != height {
542			debug!(
543				"check_orphans: {} blocks accepted since height {}, remaining # orphans {}",
544				height - initial_height,
545				initial_height,
546				self.orphans.len(),
547			);
548		}
549	}
550
551	/// Returns Ok(Some((out, pos))) if output is unspent.
552	/// Returns Ok(None) if output is spent.
553	/// Returns Err if something went wrong beyond not finding the output.
554	pub fn get_unspent(
555		&self,
556		commit: Commitment,
557	) -> Result<Option<(OutputIdentifier, CommitPos)>, Error> {
558		self.txhashset.read().get_unspent(commit)
559	}
560
561	/// Retrieves an unspent output using its PMMR position
562	pub fn get_unspent_output_at(&self, pos0: u64) -> Result<Output, Error> {
563		let header_pmmr = self.header_pmmr.read();
564		let txhashset = self.txhashset.read();
565		txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, _| {
566			utxo.get_unspent_output_at(pos0)
567		})
568	}
569
570	/// Validate the tx against the current UTXO set and recent kernels (NRD relative lock heights).
571	pub fn validate_tx(&self, tx: &Transaction) -> Result<(), Error> {
572		self.validate_tx_against_utxo(tx)?;
573		self.validate_tx_kernels(tx)?;
574		Ok(())
575	}
576
577	/// Validates NRD relative height locks against "recent" kernel history.
578	/// Applies the kernels to the current kernel MMR in a readonly extension.
579	/// The extension and the db batch are discarded.
580	/// The batch ensures duplicate NRD kernels within the tx are handled correctly.
581	fn validate_tx_kernels(&self, tx: &Transaction) -> Result<(), Error> {
582		let has_nrd_kernel = tx.kernels().iter().any(|k| match k.features {
583			KernelFeatures::NoRecentDuplicate { .. } => true,
584			_ => false,
585		});
586		if !has_nrd_kernel {
587			return Ok(());
588		}
589		let mut header_pmmr = self.header_pmmr.write();
590		let mut txhashset = self.txhashset.write();
591		txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
592			let height = self.next_block_height()?;
593			ext.extension.apply_kernels(tx.kernels(), height, batch)
594		})
595	}
596
597	fn validate_tx_against_utxo(
598		&self,
599		tx: &Transaction,
600	) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
601		let header_pmmr = self.header_pmmr.read();
602		let txhashset = self.txhashset.read();
603		txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
604			utxo.validate_tx(tx, batch)
605		})
606	}
607
608	/// Validates inputs against the current utxo.
609	/// Each input must spend an unspent output.
610	/// Returns the vec of output identifiers and their pos of the outputs
611	/// that would be spent by the inputs.
612	pub fn validate_inputs(
613		&self,
614		inputs: &Inputs,
615	) -> Result<Vec<(OutputIdentifier, CommitPos)>, Error> {
616		let header_pmmr = self.header_pmmr.read();
617		let txhashset = self.txhashset.read();
618		txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
619			utxo.validate_inputs(inputs, batch)
620		})
621	}
622
623	fn next_block_height(&self) -> Result<u64, Error> {
624		let bh = self.head_header()?;
625		Ok(bh.height + 1)
626	}
627
628	/// Verify we are not attempting to spend a coinbase output
629	/// that has not yet sufficiently matured.
630	pub fn verify_coinbase_maturity(&self, inputs: &Inputs) -> Result<(), Error> {
631		let height = self.next_block_height()?;
632		let header_pmmr = self.header_pmmr.read();
633		let txhashset = self.txhashset.read();
634		txhashset::utxo_view(&header_pmmr, &txhashset, |utxo, batch| {
635			utxo.verify_coinbase_maturity(inputs, height, batch)?;
636			Ok(())
637		})
638	}
639
640	/// Verify that the tx has a lock_height that is less than or equal to
641	/// the height of the next block.
642	pub fn verify_tx_lock_height(&self, tx: &Transaction) -> Result<(), Error> {
643		let height = self.next_block_height()?;
644		if tx.lock_height() <= height {
645			Ok(())
646		} else {
647			Err(ErrorKind::TxLockHeight.into())
648		}
649	}
650
651	/// Validate the current chain state.
652	pub fn validate(&self, fast_validation: bool) -> Result<(), Error> {
653		let header = self.store.head_header()?;
654
655		// Lets just treat an "empty" node that just got started up as valid.
656		if header.height == 0 {
657			return Ok(());
658		}
659
660		let mut header_pmmr = self.header_pmmr.write();
661		let mut txhashset = self.txhashset.write();
662
663		// Now create an extension from the txhashset and validate against the
664		// latest block header. Rewind the extension to the specified header to
665		// ensure the view is consistent.
666		txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
667			self.rewind_and_apply_fork(&header, ext, batch)?;
668			ext.extension
669				.validate(&self.genesis, fast_validation, &NoStatus, &header)?;
670			Ok(())
671		})
672	}
673
674	/// Sets prev_root on a brand new block header by applying the previous header to the header MMR.
675	pub fn set_prev_root_only(&self, header: &mut BlockHeader) -> Result<(), Error> {
676		let mut header_pmmr = self.header_pmmr.write();
677		let prev_root =
678			txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
679				let prev_header = batch.get_previous_header(header)?;
680				self.rewind_and_apply_header_fork(&prev_header, ext, batch)?;
681				ext.root()
682			})?;
683
684		// Set the prev_root on the header.
685		header.prev_root = prev_root;
686
687		Ok(())
688	}
689
690	/// Sets the txhashset roots on a brand new block by applying the block on
691	/// the current txhashset state.
692	pub fn set_txhashset_roots(&self, b: &mut Block) -> Result<(), Error> {
693		let mut header_pmmr = self.header_pmmr.write();
694		let mut txhashset = self.txhashset.write();
695
696		let (prev_root, roots, sizes) =
697			txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
698				let previous_header = batch.get_previous_header(&b.header)?;
699				self.rewind_and_apply_fork(&previous_header, ext, batch)?;
700
701				let extension = &mut ext.extension;
702				let header_extension = &mut ext.header_extension;
703
704				// Retrieve the header root before we apply the new block
705				let prev_root = header_extension.root()?;
706
707				// Apply the latest block to the chain state via the extension.
708				extension.apply_block(b, header_extension, batch)?;
709
710				Ok((prev_root, extension.roots()?, extension.sizes()))
711			})?;
712
713		// Set the output and kernel MMR sizes.
714		// Note: We need to do this *before* calculating the roots as the output_root
715		// depends on the output_mmr_size
716		{
717			// Carefully destructure these correctly...
718			let (output_mmr_size, _, kernel_mmr_size) = sizes;
719			b.header.output_mmr_size = output_mmr_size;
720			b.header.kernel_mmr_size = kernel_mmr_size;
721		}
722
723		// Set the prev_root on the header.
724		b.header.prev_root = prev_root;
725
726		// Set the output, rangeproof and kernel MMR roots.
727		b.header.output_root = roots.output_root(&b.header);
728		b.header.range_proof_root = roots.rproof_root;
729		b.header.kernel_root = roots.kernel_root;
730
731		Ok(())
732	}
733
734	/// Return a Merkle proof for the given commitment from the store.
735	pub fn get_merkle_proof<T: AsRef<OutputIdentifier>>(
736		&self,
737		out_id: T,
738		header: &BlockHeader,
739	) -> Result<MerkleProof, Error> {
740		let mut header_pmmr = self.header_pmmr.write();
741		let mut txhashset = self.txhashset.write();
742		let merkle_proof =
743			txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
744				self.rewind_and_apply_fork(&header, ext, batch)?;
745				ext.extension.merkle_proof(out_id, batch)
746			})?;
747
748		Ok(merkle_proof)
749	}
750
751	/// Return a merkle proof valid for the current output pmmr state at the
752	/// given pos
753	pub fn get_merkle_proof_for_pos(&self, commit: Commitment) -> Result<MerkleProof, Error> {
754		let mut txhashset = self.txhashset.write();
755		txhashset.merkle_proof(commit)
756	}
757
758	/// Rewind and apply fork with the chain specific header validation (denylist) rules.
759	/// If we rewind and re-apply a "denied" block then validation will fail.
760	fn rewind_and_apply_fork(
761		&self,
762		header: &BlockHeader,
763		ext: &mut ExtensionPair,
764		batch: &Batch,
765	) -> Result<BlockHeader, Error> {
766		let denylist = self.denylist.read().clone();
767		pipe::rewind_and_apply_fork(header, ext, batch, &|header| {
768			pipe::validate_header_denylist(header, &denylist)
769		})
770	}
771
772	/// Rewind and apply fork with the chain specific header validation (denylist) rules.
773	/// If we rewind and re-apply a "denied" header then validation will fail.
774	fn rewind_and_apply_header_fork(
775		&self,
776		header: &BlockHeader,
777		ext: &mut HeaderExtension,
778		batch: &Batch,
779	) -> Result<(), Error> {
780		let denylist = self.denylist.read().clone();
781		pipe::rewind_and_apply_header_fork(header, ext, batch, &|header| {
782			pipe::validate_header_denylist(header, &denylist)
783		})
784	}
785
786	/// Provides a reading view into the current txhashset state as well as
787	/// the required indexes for a consumer to rewind to a consistent state
788	/// at the provided block hash.
789	pub fn txhashset_read(&self, h: Hash) -> Result<(u64, u64, File), Error> {
790		// now we want to rewind the txhashset extension and
791		// sync a "rewound" copy of the leaf_set files to disk
792		// so we can send these across as part of the zip file.
793		// The fast sync client does *not* have the necessary data
794		// to rewind after receiving the txhashset zip.
795		let header = self.get_block_header(&h)?;
796
797		let mut header_pmmr = self.header_pmmr.write();
798		let mut txhashset = self.txhashset.write();
799
800		txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
801			self.rewind_and_apply_fork(&header, ext, batch)?;
802			ext.extension.snapshot(batch)?;
803
804			// prepare the zip
805			txhashset::zip_read(self.db_root.clone(), &header)
806				.map(|file| (header.output_mmr_size, header.kernel_mmr_size, file))
807		})
808	}
809
810	/// The segmenter is responsible for generation PIBD segments.
811	/// We cache a segmenter instance based on the current archve period (new period every 12 hours).
812	/// This allows us to efficiently generate bitmap segments for the current archive period.
813	///
814	/// It is a relatively expensive operation to initializa and cache a new segmenter instance
815	/// as this involves rewinding the txhashet by approx 720 blocks (12 hours).
816	///
817	/// Caller is responsible for only doing this when required.
818	/// Caller should verify a peer segment request is valid before calling this for example.
819	///
820	pub fn segmenter(&self) -> Result<Segmenter, Error> {
821		// The archive header corresponds to the data we will segment.
822		let ref archive_header = self.txhashset_archive_header()?;
823
824		// Use our cached segmenter if we have one and the associated header matches.
825		if let Some(x) = self.pibd_segmenter.read().as_ref() {
826			if x.header() == archive_header {
827				return Ok(x.clone());
828			}
829		}
830
831		// We have no cached segmenter or the cached segmenter is no longer useful.
832		// Initialize a new segment, cache it and return it.
833		let segmenter = self.init_segmenter(archive_header)?;
834		let mut cache = self.pibd_segmenter.write();
835		*cache = Some(segmenter.clone());
836
837		return Ok(segmenter);
838	}
839
840	/// This is an expensive rewind to recreate bitmap state but we only need to do this once.
841	/// Caller is responsible for "caching" the segmenter (per archive period) for reuse.
842	fn init_segmenter(&self, header: &BlockHeader) -> Result<Segmenter, Error> {
843		let now = Instant::now();
844		debug!(
845			"init_segmenter: initializing new segmenter for {} at {}",
846			header.hash(),
847			header.height
848		);
849
850		let mut header_pmmr = self.header_pmmr.write();
851		let mut txhashset = self.txhashset.write();
852
853		let bitmap_snapshot =
854			txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
855				ext.extension.rewind(header, batch)?;
856				Ok(ext.extension.bitmap_accumulator())
857			})?;
858
859		debug!("init_segmenter: done, took {}ms", now.elapsed().as_millis());
860
861		Ok(Segmenter::new(
862			self.txhashset(),
863			Arc::new(bitmap_snapshot),
864			header.clone(),
865		))
866	}
867
868	/// instantiate desegmenter (in same lazy fashion as segmenter, though this should not be as
869	/// expensive an operation)
870	pub fn desegmenter(&self, archive_header: &BlockHeader) -> Result<Desegmenter, Error> {
871		// Use our cached desegmenter if we have one and the associated header matches.
872		if let Some(d) = self.pibd_desegmenter.read().as_ref() {
873			if d.header() == archive_header {
874				return Ok(d.clone());
875			}
876		}
877		// If no desegmenter or headers don't match init
878		// TODO: (Check whether we can do this.. we *should* be able to modify this as the desegmenter
879		// is in flight and we cross a horizon boundary, but needs more thinking)
880		let desegmenter = self.init_desegmenter(archive_header)?;
881		let mut cache = self.pibd_desegmenter.write();
882		*cache = Some(desegmenter.clone());
883
884		return Ok(desegmenter);
885	}
886
887	/// initialize a desegmenter, which is capable of extending the hashset by appending
888	/// PIBD segments of the three PMMR trees + Bitmap PMMR
889	/// header should be the same header as selected for the txhashset.zip archive
890	fn init_desegmenter(&self, header: &BlockHeader) -> Result<Desegmenter, Error> {
891		debug!(
892			"init_desegmenter: initializing new desegmenter for {} at {}",
893			header.hash(),
894			header.height
895		);
896
897		Ok(Desegmenter::new(
898			self.txhashset(),
899			self.header_pmmr.clone(),
900			header.clone(),
901			self.store.clone(),
902		))
903	}
904
905	/// To support the ability to download the txhashset from multiple peers in parallel,
906	/// the peers must all agree on the exact binary representation of the txhashset.
907	/// This means compacting and rewinding to the exact same header.
908	/// Since compaction is a heavy operation, peers can agree to compact every 12 hours,
909	/// and no longer support requesting arbitrary txhashsets.
910	/// Here we return the header of the txhashset we are currently offering to peers.
911	pub fn txhashset_archive_header(&self) -> Result<BlockHeader, Error> {
912		let sync_threshold = global::state_sync_threshold() as u64;
913		let body_head = self.head()?;
914		let archive_interval = global::txhashset_archive_interval();
915		let mut txhashset_height = body_head.height.saturating_sub(sync_threshold);
916		txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
917
918		debug!(
919			"txhashset_archive_header: body_head - {}, {}, txhashset height - {}",
920			body_head.last_block_h, body_head.height, txhashset_height,
921		);
922
923		self.get_header_by_height(txhashset_height)
924	}
925
926	// Special handling to make sure the whole kernel set matches each of its
927	// roots in each block header, without truncation. We go back header by
928	// header, rewind and check each root. This fixes a potential weakness in
929	// fast sync where a reorg past the horizon could allow a whole rewrite of
930	// the kernel set.
931	fn validate_kernel_history(
932		&self,
933		header: &BlockHeader,
934		txhashset: &txhashset::TxHashSet,
935	) -> Result<(), Error> {
936		debug!("validate_kernel_history: rewinding and validating kernel history (readonly)");
937
938		let mut count = 0;
939		let mut current = header.clone();
940		txhashset::rewindable_kernel_view(&txhashset, |view, batch| {
941			while current.height > 0 {
942				view.rewind(&current)?;
943				view.validate_root()?;
944				current = batch.get_previous_header(&current)?;
945				count += 1;
946			}
947			Ok(())
948		})?;
949
950		debug!(
951			"validate_kernel_history: validated kernel root on {} headers",
952			count,
953		);
954
955		Ok(())
956	}
957
958	/// Finds the "fork point" where header chain diverges from full block chain.
959	/// If we are syncing this will correspond to the last full block where
960	/// the next header is known but we do not yet have the full block.
961	/// i.e. This is the last known full block and all subsequent blocks are missing.
962	pub fn fork_point(&self) -> Result<BlockHeader, Error> {
963		let body_head = self.head()?;
964		let mut current = self.get_block_header(&body_head.hash())?;
965		while !self.is_on_current_chain(&current, body_head).is_ok() {
966			current = self.get_previous_header(&current)?;
967		}
968		Ok(current)
969	}
970
971	/// Compare fork point to our horizon.
972	/// If beyond the horizon then we cannot sync via recent full blocks
973	/// and we need a state (txhashset) sync.
974	pub fn check_txhashset_needed(&self, fork_point: &BlockHeader) -> Result<bool, Error> {
975		if self.archive_mode() {
976			debug!("check_txhashset_needed: we are running with archive_mode=true, not needed");
977			return Ok(false);
978		}
979
980		let header_head = self.header_head()?;
981		let horizon = global::cut_through_horizon() as u64;
982		Ok(fork_point.height < header_head.height.saturating_sub(horizon))
983	}
984
985	/// Clean the temporary sandbox folder
986	pub fn clean_txhashset_sandbox(&self) {
987		txhashset::clean_txhashset_folder(&self.get_tmp_dir());
988	}
989
990	/// Specific tmp dir.
991	/// Normally it's ~/.aigc/main/tmp for mainnet
992	/// or ~/.aigc/test/tmp for Testnet
993	pub fn get_tmp_dir(&self) -> PathBuf {
994		let mut tmp_dir = PathBuf::from(self.db_root.clone());
995		tmp_dir = tmp_dir
996			.parent()
997			.expect("fail to get parent of db_root dir")
998			.to_path_buf();
999		tmp_dir.push("tmp");
1000		tmp_dir
1001	}
1002
1003	/// Get a tmp file path in above specific tmp dir (create tmp dir if not exist)
1004	/// Delete file if tmp file already exists
1005	pub fn get_tmpfile_pathname(&self, tmpfile_name: String) -> PathBuf {
1006		let mut tmp = self.get_tmp_dir();
1007		if !tmp.exists() {
1008			if let Err(e) = fs::create_dir(tmp.clone()) {
1009				warn!("fail to create tmp folder on {:?}. err: {}", tmp, e);
1010			}
1011		}
1012		tmp.push(tmpfile_name);
1013		if tmp.exists() {
1014			if let Err(e) = fs::remove_file(tmp.clone()) {
1015				warn!("fail to clean existing tmp file: {:?}. err: {}", tmp, e);
1016			}
1017		}
1018		tmp
1019	}
1020
1021	/// Writes a reading view on a txhashset state that's been provided to us.
1022	/// If we're willing to accept that new state, the data stream will be
1023	/// read as a zip file, unzipped and the resulting state files should be
1024	/// rewound to the provided indexes.
1025	pub fn txhashset_write(
1026		&self,
1027		h: Hash,
1028		txhashset_data: File,
1029		status: &dyn TxHashsetWriteStatus,
1030	) -> Result<bool, Error> {
1031		status.on_setup();
1032
1033		// Initial check whether this txhashset is needed or not
1034		let fork_point = self.fork_point()?;
1035		if !self.check_txhashset_needed(&fork_point)? {
1036			warn!("txhashset_write: txhashset received but it's not needed! ignored.");
1037			return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
1038		}
1039
1040		let header = match self.get_block_header(&h) {
1041			Ok(header) => header,
1042			Err(_) => {
1043				warn!("txhashset_write: cannot find block header");
1044				// This is a bannable reason
1045				return Ok(true);
1046			}
1047		};
1048
1049		// Write txhashset to sandbox (in the Aigc specific tmp dir)
1050		let sandbox_dir = self.get_tmp_dir();
1051		txhashset::clean_txhashset_folder(&sandbox_dir);
1052		txhashset::zip_write(sandbox_dir.clone(), txhashset_data.try_clone()?, &header)?;
1053
1054		let mut txhashset = txhashset::TxHashSet::open(
1055			sandbox_dir
1056				.to_str()
1057				.expect("invalid sandbox folder")
1058				.to_owned(),
1059			self.store.clone(),
1060			Some(&header),
1061		)?;
1062
1063		// Validate the full kernel history.
1064		// Check kernel MMR root for every block header.
1065		// Check NRD relative height rules for full kernel history.
1066		{
1067			self.validate_kernel_history(&header, &txhashset)?;
1068
1069			let header_pmmr = self.header_pmmr.read();
1070			let batch = self.store.batch()?;
1071			txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch)?;
1072		}
1073
1074		// all good, prepare a new batch and update all the required records
1075		debug!("txhashset_write: rewinding a 2nd time (writeable)");
1076
1077		let mut header_pmmr = self.header_pmmr.write();
1078		let mut batch = self.store.batch()?;
1079		txhashset::extending(
1080			&mut header_pmmr,
1081			&mut txhashset,
1082			&mut batch,
1083			|ext, batch| {
1084				let extension = &mut ext.extension;
1085				extension.rewind(&header, batch)?;
1086
1087				// Validate the extension, generating the utxo_sum and kernel_sum.
1088				// Full validation, including rangeproofs and kernel signature verification.
1089				let (utxo_sum, kernel_sum) =
1090					extension.validate(&self.genesis, false, status, &header)?;
1091
1092				// Save the block_sums (utxo_sum, kernel_sum) to the db for use later.
1093				batch.save_block_sums(
1094					&header.hash(),
1095					BlockSums {
1096						utxo_sum,
1097						kernel_sum,
1098					},
1099				)?;
1100
1101				Ok(())
1102			},
1103		)?;
1104
1105		debug!("txhashset_write: finished validating and rebuilding");
1106
1107		status.on_save();
1108
1109		// Save the new head to the db and rebuild the header by height index.
1110		{
1111			let tip = Tip::from_header(&header);
1112			batch.save_body_head(&tip)?;
1113
1114			// Reset the body tail to the body head after a txhashset write
1115			batch.save_body_tail(&tip)?;
1116		}
1117
1118		// Rebuild our output_pos index in the db based on fresh UTXO set.
1119		txhashset.init_output_pos_index(&header_pmmr, &batch)?;
1120
1121		// Rebuild our NRD kernel_pos index based on recent kernel history.
1122		txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;
1123
1124		// Commit all the changes to the db.
1125		batch.commit()?;
1126
1127		debug!("txhashset_write: finished committing the batch (head etc.)");
1128
1129		// Sandbox full validation ok, go to overwrite txhashset on db root
1130		{
1131			let mut txhashset_ref = self.txhashset.write();
1132
1133			// Before overwriting, drop file handlers in underlying txhashset
1134			txhashset_ref.release_backend_files();
1135
1136			// Move sandbox to overwrite
1137			txhashset.release_backend_files();
1138			txhashset::txhashset_replace(sandbox_dir, PathBuf::from(self.db_root.clone()))?;
1139
1140			// Re-open on db root dir
1141			txhashset = txhashset::TxHashSet::open(
1142				self.db_root.clone(),
1143				self.store.clone(),
1144				Some(&header),
1145			)?;
1146
1147			// Replace the chain txhashset with the newly built one.
1148			*txhashset_ref = txhashset;
1149		}
1150
1151		debug!("txhashset_write: replaced our txhashset with the new one");
1152
1153		status.on_done();
1154
1155		Ok(false)
1156	}
1157
1158	/// Cleanup old blocks from the db.
1159	/// Determine the cutoff height from the horizon and the current block height.
1160	/// *Only* runs if we are not in archive mode.
1161	fn remove_historical_blocks(
1162		&self,
1163		header_pmmr: &txhashset::PMMRHandle<BlockHeader>,
1164		batch: &store::Batch<'_>,
1165	) -> Result<(), Error> {
1166		if self.archive_mode() {
1167			return Ok(());
1168		}
1169
1170		let mut horizon = global::cut_through_horizon() as u64;
1171
1172		let head = batch.head()?;
1173
1174		let tail = match batch.tail() {
1175			Ok(tail) => tail,
1176			Err(_) => Tip::from_header(&self.genesis),
1177		};
1178
1179		let mut cutoff = head.height.saturating_sub(horizon);
1180
1181		// TODO: Check this, compaction selects a different horizon
1182		// block from txhashset horizon/PIBD segmenter when using
1183		// Automated testing chain
1184		let archive_header = self.txhashset_archive_header()?;
1185		if archive_header.height < cutoff {
1186			cutoff = archive_header.height;
1187			horizon = head.height - archive_header.height;
1188		}
1189
1190		debug!(
1191			"remove_historical_blocks: head height: {}, tail height: {}, horizon: {}, cutoff: {}",
1192			head.height, tail.height, horizon, cutoff,
1193		);
1194
1195		if cutoff == 0 {
1196			return Ok(());
1197		}
1198
1199		let mut count = 0;
1200		let tail_hash = header_pmmr.get_header_hash_by_height(head.height - horizon)?;
1201		let tail = batch.get_block_header(&tail_hash)?;
1202
1203		// Remove old blocks (including short lived fork blocks) which height < tail.height
1204		for block in batch.blocks_iter()? {
1205			if block.header.height < tail.height {
1206				let _ = batch.delete_block(&block.hash());
1207				count += 1;
1208			}
1209		}
1210
1211		batch.save_body_tail(&Tip::from_header(&tail))?;
1212
1213		debug!(
1214			"remove_historical_blocks: removed {} blocks. tail height: {}",
1215			count, tail.height
1216		);
1217
1218		Ok(())
1219	}
1220
1221	/// Triggers chain compaction.
1222	///
1223	/// * compacts the txhashset based on current prune_list
1224	/// * removes historical blocks and associated data from the db (unless archive mode)
1225	///
1226	pub fn compact(&self) -> Result<(), Error> {
1227		// A node may be restarted multiple times in a short period of time.
1228		// We compact at most once per 60 blocks in this situation by comparing
1229		// current "head" and "tail" height to our cut-through horizon and
1230		// allowing an additional 60 blocks in height before allowing a further compaction.
1231		if let (Ok(tail), Ok(head)) = (self.tail(), self.head()) {
1232			let horizon = global::cut_through_horizon() as u64;
1233			let threshold = horizon.saturating_add(60);
1234			let next_compact = tail.height.saturating_add(threshold);
1235			if next_compact > head.height {
1236				debug!(
1237					"compact: skipping startup compaction (next at {})",
1238					next_compact
1239				);
1240				return Ok(());
1241			}
1242		}
1243
1244		// Take a write lock on the txhashet and start a new writeable db batch.
1245		let header_pmmr = self.header_pmmr.read();
1246		let mut txhashset = self.txhashset.write();
1247		let batch = self.store.batch()?;
1248
1249		// Compact the txhashset itself (rewriting the pruned backend files).
1250		{
1251			let head_header = batch.head_header()?;
1252			let current_height = head_header.height;
1253			let horizon_height =
1254				current_height.saturating_sub(global::cut_through_horizon().into());
1255			let horizon_hash = header_pmmr.get_header_hash_by_height(horizon_height)?;
1256			let horizon_header = batch.get_block_header(&horizon_hash)?;
1257
1258			txhashset.compact(&horizon_header, &batch)?;
1259		}
1260
1261		// If we are not in archival mode remove historical blocks from the db.
1262		if !self.archive_mode() {
1263			self.remove_historical_blocks(&header_pmmr, &batch)?;
1264		}
1265
1266		// Make sure our output_pos index is consistent with the UTXO set.
1267		txhashset.init_output_pos_index(&header_pmmr, &batch)?;
1268
1269		// TODO - Why is this part of chain compaction?
1270		// Rebuild our NRD kernel_pos index based on recent kernel history.
1271		txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;
1272
1273		// Commit all the above db changes.
1274		batch.commit()?;
1275
1276		Ok(())
1277	}
1278
1279	/// returns the last n nodes inserted into the output sum tree
1280	pub fn get_last_n_output(&self, distance: u64) -> Vec<(Hash, OutputIdentifier)> {
1281		self.txhashset.read().last_n_output(distance)
1282	}
1283
1284	/// as above, for rangeproofs
1285	pub fn get_last_n_rangeproof(&self, distance: u64) -> Vec<(Hash, RangeProof)> {
1286		self.txhashset.read().last_n_rangeproof(distance)
1287	}
1288
1289	/// as above, for kernels
1290	pub fn get_last_n_kernel(&self, distance: u64) -> Vec<(Hash, TxKernel)> {
1291		self.txhashset.read().last_n_kernel(distance)
1292	}
1293
1294	/// Return Commit's MMR position
1295	pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
1296		Ok(self.txhashset.read().get_output_pos(commit)?)
1297	}
1298
1299	/// outputs by insertion index
1300	pub fn unspent_outputs_by_pmmr_index(
1301		&self,
1302		start_index: u64,
1303		max_count: u64,
1304		max_pmmr_index: Option<u64>,
1305	) -> Result<(u64, u64, Vec<Output>), Error> {
1306		let txhashset = self.txhashset.read();
1307		let last_index = match max_pmmr_index {
1308			Some(i) => i,
1309			None => txhashset.output_mmr_size(),
1310		};
1311		let outputs = txhashset.outputs_by_pmmr_index(start_index, max_count, max_pmmr_index);
1312		let rangeproofs =
1313			txhashset.rangeproofs_by_pmmr_index(start_index, max_count, max_pmmr_index);
1314		if outputs.0 != rangeproofs.0 || outputs.1.len() != rangeproofs.1.len() {
1315			return Err(ErrorKind::TxHashSetErr(String::from(
1316				"Output and rangeproof sets don't match",
1317			))
1318			.into());
1319		}
1320		let mut output_vec: Vec<Output> = vec![];
1321		for (ref x, &y) in outputs.1.iter().zip(rangeproofs.1.iter()) {
1322			output_vec.push(Output::new(x.features, x.commitment(), y));
1323		}
1324		Ok((outputs.0, last_index, output_vec))
1325	}
1326
1327	/// Return unspent outputs as above, but bounded between a particular range of blocks
1328	pub fn block_height_range_to_pmmr_indices(
1329		&self,
1330		start_block_height: u64,
1331		end_block_height: Option<u64>,
1332	) -> Result<(u64, u64), Error> {
1333		let end_block_height = match end_block_height {
1334			Some(h) => h,
1335			None => self.head_header()?.height,
1336		};
1337		// Return headers at the given heights
1338		let start_mmr_size = if start_block_height == 0 {
1339			0
1340		} else {
1341			self.get_header_by_height(start_block_height - 1)?
1342				.output_mmr_size + 1
1343		};
1344		let end_mmr_size = self.get_header_by_height(end_block_height)?.output_mmr_size;
1345		Ok((start_mmr_size, end_mmr_size))
1346	}
1347
1348	/// Orphans pool size
1349	pub fn orphans_len(&self) -> usize {
1350		self.orphans.len()
1351	}
1352
1353	/// Tip (head) of the block chain.
1354	pub fn head(&self) -> Result<Tip, Error> {
1355		self.store
1356			.head()
1357			.map_err(|e| ErrorKind::StoreErr(e, "chain head".to_owned()).into())
1358	}
1359
1360	/// Tail of the block chain in this node after compact (cross-block cut-through)
1361	pub fn tail(&self) -> Result<Tip, Error> {
1362		self.store
1363			.tail()
1364			.map_err(|e| ErrorKind::StoreErr(e, "chain tail".to_owned()).into())
1365	}
1366
1367	/// Tip (head) of the header chain.
1368	pub fn header_head(&self) -> Result<Tip, Error> {
1369		self.store
1370			.header_head()
1371			.map_err(|e| ErrorKind::StoreErr(e, "header head".to_owned()).into())
1372	}
1373
1374	/// Block header for the chain head
1375	pub fn head_header(&self) -> Result<BlockHeader, Error> {
1376		self.store
1377			.head_header()
1378			.map_err(|e| ErrorKind::StoreErr(e, "chain head header".to_owned()).into())
1379	}
1380
1381	/// Gets a block by hash
1382	pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
1383		self.store
1384			.get_block(h)
1385			.map_err(|e| ErrorKind::StoreErr(e, "chain get block".to_owned()).into())
1386	}
1387
1388	/// Gets a block header by hash
1389	pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
1390		self.store
1391			.get_block_header(h)
1392			.map_err(|e| ErrorKind::StoreErr(e, "chain get header".to_owned()).into())
1393	}
1394
1395	/// Get previous block header.
1396	pub fn get_previous_header(&self, header: &BlockHeader) -> Result<BlockHeader, Error> {
1397		self.store
1398			.get_previous_header(header)
1399			.map_err(|e| ErrorKind::StoreErr(e, "chain get previous header".to_owned()).into())
1400	}
1401
1402	/// Get block_sums by header hash.
1403	pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
1404		self.store
1405			.get_block_sums(h)
1406			.map_err(|e| ErrorKind::StoreErr(e, "chain get block_sums".to_owned()).into())
1407	}
1408
1409	/// Gets the block header at the provided height.
1410	/// Note: Takes a read lock on the header_pmmr.
1411	pub fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, Error> {
1412		let hash = self.get_header_hash_by_height(height)?;
1413		self.get_block_header(&hash)
1414	}
1415
1416	/// Gets the header hash at the provided height.
1417	/// Note: Takes a read lock on the header_pmmr.
1418	fn get_header_hash_by_height(&self, height: u64) -> Result<Hash, Error> {
1419		self.header_pmmr.read().get_header_hash_by_height(height)
1420	}
1421
1422	/// Gets the block header in which a given output appears in the txhashset.
1423	pub fn get_header_for_output(&self, commit: Commitment) -> Result<BlockHeader, Error> {
1424		let header_pmmr = self.header_pmmr.read();
1425		let txhashset = self.txhashset.read();
1426		let (_, pos) = match txhashset.get_unspent(commit)? {
1427			Some(o) => o,
1428			None => return Err(ErrorKind::OutputNotFound.into()),
1429		};
1430		let hash = header_pmmr.get_header_hash_by_height(pos.height)?;
1431		Ok(self.get_block_header(&hash)?)
1432	}
1433
1434	/// Gets the kernel with a given excess and the block height it is included in.
1435	pub fn get_kernel_height(
1436		&self,
1437		excess: &Commitment,
1438		min_height: Option<u64>,
1439		max_height: Option<u64>,
1440	) -> Result<Option<(TxKernel, u64, u64)>, Error> {
1441		let head = self.head()?;
1442
1443		if let (Some(min), Some(max)) = (min_height, max_height) {
1444			if min > max {
1445				return Ok(None);
1446			}
1447		}
1448
1449		let min_index = match min_height {
1450			Some(0) => None,
1451			Some(h) => {
1452				if h > head.height {
1453					return Ok(None);
1454				}
1455				let header = self.get_header_by_height(h)?;
1456				let prev_header = self.get_previous_header(&header)?;
1457				Some(prev_header.kernel_mmr_size + 1)
1458			}
1459			None => None,
1460		};
1461
1462		let max_index = match max_height {
1463			Some(h) => {
1464				if h > head.height {
1465					None
1466				} else {
1467					let header = self.get_header_by_height(h)?;
1468					Some(header.kernel_mmr_size)
1469				}
1470			}
1471			None => None,
1472		};
1473
1474		let (kernel, mmr_index) = match self
1475			.txhashset
1476			.read()
1477			.find_kernel(&excess, min_index, max_index)
1478		{
1479			Some(k) => k,
1480			None => return Ok(None),
1481		};
1482
1483		let header = self.get_header_for_kernel_index(mmr_index, min_height, max_height)?;
1484
1485		Ok(Some((kernel, header.height, mmr_index)))
1486	}
1487	/// Gets the block header in which a given kernel mmr index appears in the txhashset.
1488	pub fn get_header_for_kernel_index(
1489		&self,
1490		kernel_mmr_index: u64,
1491		min_height: Option<u64>,
1492		max_height: Option<u64>,
1493	) -> Result<BlockHeader, Error> {
1494		let header_pmmr = self.header_pmmr.read();
1495
1496		let mut min = min_height.unwrap_or(0).saturating_sub(1);
1497		let mut max = match max_height {
1498			Some(h) => h,
1499			None => self.head()?.height,
1500		};
1501
1502		loop {
1503			let search_height = max - (max - min) / 2;
1504			let hash = header_pmmr.get_header_hash_by_height(search_height)?;
1505			let h = self.get_block_header(&hash)?;
1506			if search_height == 0 {
1507				return Ok(h);
1508			}
1509			let hash_prev = header_pmmr.get_header_hash_by_height(search_height - 1)?;
1510			let h_prev = self.get_block_header(&hash_prev)?;
1511			if kernel_mmr_index > h.kernel_mmr_size {
1512				min = search_height;
1513			} else if kernel_mmr_index < h_prev.kernel_mmr_size {
1514				max = search_height;
1515			} else {
1516				if kernel_mmr_index == h_prev.kernel_mmr_size {
1517					return Ok(h_prev);
1518				}
1519				return Ok(h);
1520			}
1521		}
1522	}
1523
1524	/// Verifies the given block header is actually on the current chain.
1525	/// Checks the header_by_height index to verify the header is where we say
1526	/// it is
1527	fn is_on_current_chain<T: Into<Tip>>(&self, x: T, head: Tip) -> Result<(), Error> {
1528		let x: Tip = x.into();
1529		if x.height > head.height {
1530			return Err(ErrorKind::Other("not on current chain".to_string()).into());
1531		}
1532
1533		if x.hash() == self.get_header_hash_by_height(x.height)? {
1534			Ok(())
1535		} else {
1536			Err(ErrorKind::Other("not on current chain".to_string()).into())
1537		}
1538	}
1539
1540	/// Gets multiple headers at the provided heights.
1541	/// Note: This is based on the provided sync_head to support syncing against a fork.
1542	pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result<Vec<Hash>, Error> {
1543		let mut header_pmmr = self.header_pmmr.write();
1544		txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
1545			let header = batch.get_block_header(&sync_head.hash())?;
1546			self.rewind_and_apply_header_fork(&header, ext, batch)?;
1547
1548			let hashes = heights
1549				.iter()
1550				.filter_map(|h| ext.get_header_hash_by_height(*h))
1551				.collect();
1552
1553			Ok(hashes)
1554		})
1555	}
1556
1557	/// Builds an iterator on blocks starting from the current chain head and
1558	/// running backward. Specialized to return information pertaining to block
1559	/// difficulty calculation (timestamp and previous difficulties).
1560	pub fn difficulty_iter(&self) -> Result<store::DifficultyIter<'_>, Error> {
1561		let head = self.head()?;
1562		let store = self.store.clone();
1563		Ok(store::DifficultyIter::from(head.last_block_h, store))
1564	}
1565
1566	/// Check whether we have a block without reading it
1567	pub fn block_exists(&self, h: Hash) -> Result<bool, Error> {
1568		self.store
1569			.block_exists(&h)
1570			.map_err(|e| ErrorKind::StoreErr(e, "chain block exists".to_owned()).into())
1571	}
1572}
1573
1574fn setup_head(
1575	genesis: &Block,
1576	store: &store::ChainStore,
1577	header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
1578	txhashset: &mut txhashset::TxHashSet,
1579) -> Result<(), Error> {
1580	let mut batch = store.batch()?;
1581
1582	// Apply the genesis header to header MMR.
1583	{
1584		if batch.get_block_header(&genesis.hash()).is_err() {
1585			batch.save_block_header(&genesis.header)?;
1586		}
1587
1588		if header_pmmr.size == 0 {
1589			txhashset::header_extending(header_pmmr, &mut batch, |ext, _| {
1590				ext.apply_header(&genesis.header)
1591			})?;
1592		}
1593	}
1594
1595	// Make sure our header PMMR is consistent with header_head from db if it exists.
1596	// If header_head is missing in db then use head of header PMMR.
1597	if let Ok(head) = batch.header_head() {
1598		header_pmmr.init_head(&head)?;
1599		txhashset::header_extending(header_pmmr, &mut batch, |ext, batch| {
1600			let header = batch.get_block_header(&head.hash())?;
1601			ext.rewind(&header)
1602		})?;
1603	} else {
1604		let hash = header_pmmr.head_hash()?;
1605		let header = batch.get_block_header(&hash)?;
1606		batch.save_header_head(&Tip::from_header(&header))?;
1607	}
1608
1609	// check if we have a head in store, otherwise the genesis block is it
1610	let head_res = batch.head();
1611	let mut head: Tip;
1612	match head_res {
1613		Ok(h) => {
1614			head = h;
1615			loop {
1616				// Use current chain tip if we have one.
1617				// Note: We are rewinding and validating against a writeable extension.
1618				// If validation is successful we will truncate the backend files
1619				// to match the provided block header.
1620				let header = batch.get_block_header(&head.last_block_h)?;
1621
1622				let res = txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
1623					pipe::rewind_and_apply_fork(&header, ext, batch, &|_| Ok(()))?;
1624
1625					let extension = &mut ext.extension;
1626
1627					extension.validate_roots(&header)?;
1628
1629					// now check we have the "block sums" for the block in question
1630					// if we have no sums (migrating an existing node) we need to go
1631					// back to the txhashset and sum the outputs and kernels
1632					if header.height > 0 && batch.get_block_sums(&header.hash()).is_err() {
1633						debug!(
1634							"init: building (missing) block sums for {} @ {}",
1635							header.height,
1636							header.hash()
1637						);
1638
1639						// Do a full (and slow) validation of the txhashset extension
1640						// to calculate the utxo_sum and kernel_sum at this block height.
1641						let (utxo_sum, kernel_sum) =
1642							extension.validate_kernel_sums(&genesis.header, &header)?;
1643
1644						// Save the block_sums to the db for use later.
1645						batch.save_block_sums(
1646							&header.hash(),
1647							BlockSums {
1648								utxo_sum,
1649								kernel_sum,
1650							},
1651						)?;
1652					}
1653
1654					debug!(
1655						"init: rewinding and validating before we start... {} at {}",
1656						header.hash(),
1657						header.height,
1658					);
1659					Ok(())
1660				});
1661
1662				if res.is_ok() {
1663					break;
1664				} else {
1665					// We may have corrupted the MMR backend files last time we stopped the
1666					// node. If this happens we rewind to the previous header,
1667					// delete the "bad" block and try again.
1668					let prev_header = batch.get_block_header(&head.prev_block_h)?;
1669
1670					txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
1671						pipe::rewind_and_apply_fork(&prev_header, ext, batch, &|_| Ok(()))
1672					})?;
1673
1674					// Now "undo" the latest block and forget it ever existed.
1675					// We will request it from a peer during sync as necessary.
1676					{
1677						let _ = batch.delete_block(&header.hash());
1678						head = Tip::from_header(&prev_header);
1679						batch.save_body_head(&head)?;
1680					}
1681				}
1682			}
1683		}
1684		Err(NotFoundErr(_)) => {
1685			let mut sums = BlockSums::default();
1686
1687			// Save the genesis header with a "zero" header_root.
1688			// We will update this later once we have the correct header_root.
1689			batch.save_block(&genesis)?;
1690			batch.save_spent_index(&genesis.hash(), &vec![])?;
1691			batch.save_body_head(&Tip::from_header(&genesis.header))?;
1692
1693			if !genesis.kernels().is_empty() {
1694				let (utxo_sum, kernel_sum) = (sums, genesis as &dyn Committed).verify_kernel_sums(
1695					genesis.header.overage(),
1696					genesis.header.total_kernel_offset(),
1697				)?;
1698				sums = BlockSums {
1699					utxo_sum,
1700					kernel_sum,
1701				};
1702			}
1703			txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
1704				ext.extension
1705					.apply_block(&genesis, ext.header_extension, batch)
1706			})?;
1707
1708			// Save the block_sums to the db for use later.
1709			batch.save_block_sums(&genesis.hash(), sums)?;
1710
1711			info!("init: saved genesis: {:?}", genesis.hash());
1712		}
1713		Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()).into()),
1714	};
1715	batch.commit()?;
1716	Ok(())
1717}