1use crate::blocks::{BlockParser, ParserIterator, ParserOptions, Pipeline};
4use anyhow::Result;
5use bitcoin::block::Header;
6use bitcoin::hashes::Hash;
7use bitcoin::{Block, OutPoint, Transaction, TxIn, TxOut, Txid};
8use dashmap::DashMap;
9use log::info;
10use rand::prelude::SmallRng;
11use rand::{Error, RngCore, SeedableRng};
12use scalable_cuckoo_filter::{DefaultHasher, ScalableCuckooFilter, ScalableCuckooFilterBuilder};
13use std::fs;
14use std::fs::File;
15use std::io::{BufReader, BufWriter};
16use std::iter::Zip;
17use std::slice::Iter;
18use std::sync::{Arc, Mutex};
19
20#[derive(Clone, Eq, PartialEq, Debug)]
22pub struct UtxoBlock {
23 pub header: Header,
25 pub txdata: Vec<UtxoTransaction>,
27}
28
29impl UtxoBlock {
30 fn new(block: Block) -> Self {
32 Self {
33 header: block.header,
34 txdata: block.txdata.into_iter().map(UtxoTransaction::new).collect(),
35 }
36 }
37
38 pub fn to_block(self) -> Block {
40 Block {
41 header: self.header,
42 txdata: self.txdata.into_iter().map(|tx| tx.transaction).collect(),
43 }
44 }
45}
46
47#[derive(Clone, Eq, PartialEq, Debug)]
49pub struct UtxoTransaction {
50 pub transaction: Transaction,
52 pub txid: Txid,
54 inputs: Vec<TxOut>,
56 outputs: Vec<OutputStatus>,
58}
59
60impl UtxoTransaction {
61 fn new(transaction: Transaction) -> UtxoTransaction {
63 Self {
64 txid: transaction.compute_txid(),
65 transaction,
66 inputs: vec![],
67 outputs: vec![],
68 }
69 }
70
71 pub fn input(&self) -> Zip<Iter<'_, TxIn>, Iter<'_, TxOut>> {
73 self.transaction.input.iter().zip(self.inputs.iter())
74 }
75
76 pub fn output(&self) -> Zip<Iter<'_, TxOut>, Iter<'_, OutputStatus>> {
78 self.transaction.output.iter().zip(self.outputs.iter())
79 }
80}
81
82#[derive(Clone, Debug, Eq, PartialEq, Copy)]
84pub enum OutputStatus {
85 Spent,
87 Unspent,
89}
90
91type ShortOutPoints = (Vec<ShortOutPoint>, Vec<ShortOutPoint>);
92type ShortOutPointFilter = ScalableCuckooFilter<ShortOutPoint, DefaultHasher, FastRng>;
93
94#[derive(Clone, Debug)]
145pub struct UtxoParser {
146 filter_file: String,
148 blocks_dir: String,
150 estimated_utxos: usize,
152 end_height: usize,
154 options: ParserOptions,
156}
157
158impl UtxoParser {
159 pub fn new(blocks_dir: &str, filter_file: &str) -> Self {
168 Self {
169 filter_file: filter_file.to_string(),
170 blocks_dir: blocks_dir.to_string(),
171 estimated_utxos: 250_000_000,
172 end_height: usize::MAX,
173 options: Default::default(),
174 }
175 }
176
177 pub fn estimated_utxos(mut self, estimated_utxos: usize) -> Self {
181 self.estimated_utxos = estimated_utxos;
182 self
183 }
184
185 pub fn end_height(mut self, end_height: usize) -> Self {
191 self.end_height = end_height;
192 self
193 }
194
195 pub fn with_opts(mut self, options: ParserOptions) -> Self {
197 self.options = options;
198 self
199 }
200
201 pub fn parse<T: Send + 'static>(
207 self,
208 extract: impl Fn(UtxoBlock) -> T + Clone + Send + 'static,
209 ) -> Result<ParserIterator<T>> {
210 if !fs::exists(&self.filter_file)? {
211 self.create_filter()?;
212 } else {
213 info!("Found UTXO filter '{}'", self.filter_file);
214 }
215
216 let reader = BufReader::new(File::open(&self.filter_file)?);
217 let filter = bincode::deserialize_from(reader)?;
218 let pipeline = UtxoPipeline::new(filter, extract);
219
220 Ok(
221 BlockParser::new_with_opts(&self.blocks_dir, self.options.clone())?
222 .end_height(self.end_height)
223 .parse(UtxoBlock::new)
224 .ordered()
225 .pipeline(&pipeline),
226 )
227 }
228
229 pub fn create_filter(&self) -> Result<Self> {
231 info!("Creating UTXO filter '{}'", self.filter_file);
232 let filter = UtxoFilter::new(self.estimated_utxos);
233 BlockParser::new_with_opts(&self.blocks_dir, self.options.clone())?
234 .end_height(self.end_height)
235 .parse(UtxoFilter::outpoints)
236 .ordered()
237 .map(&|outpoints| filter.update(outpoints))
238 .for_each(|_| {});
239
240 let filter = Arc::try_unwrap(filter.filter).expect("Arc still referenced");
241 let mut filter = Mutex::into_inner(filter)?;
242 filter.shrink_to_fit();
243 let writer = BufWriter::new(File::create(&self.filter_file)?);
244 bincode::serialize_into(writer, &filter)?;
245 info!("Finished creating UTXO filter '{}'", self.filter_file);
246 Ok(self.clone())
247 }
248}
249
250#[derive(Clone)]
252struct UtxoFilter {
253 filter: Arc<Mutex<ShortOutPointFilter>>,
254}
255
256impl UtxoFilter {
257 fn new(filter_capacity: usize) -> UtxoFilter {
259 Self {
260 filter: Arc::new(Mutex::new(
261 ScalableCuckooFilterBuilder::default()
262 .initial_capacity(filter_capacity)
263 .false_positive_probability(0.000_000_000_001)
264 .rng(FastRng::default())
265 .finish(),
266 )),
267 }
268 }
269
270 fn outpoints(block: Block) -> ShortOutPoints {
272 let mut inputs = vec![];
273 let mut outputs = vec![];
274 for tx in block.txdata.iter() {
275 let txid = tx.compute_txid();
276 for input in &tx.input {
277 inputs.push(ShortOutPoint::from_outpoint(&input.previous_output));
278 }
279
280 for (index, _) in tx.output.iter().enumerate() {
281 outputs.push(ShortOutPoint::new(index, &txid));
282 }
283 }
284 (inputs, outputs)
285 }
286
287 pub fn update(&self, outpoints: ShortOutPoints) {
289 let mut filter = self.filter.lock().expect("Lock poisoned");
290 let (inputs, outputs) = outpoints;
291 for outpoint in outputs {
292 filter.insert(&outpoint);
294 }
295 for input in inputs {
296 filter.remove(&input);
298 }
299 }
300}
301
302#[derive(Clone)]
304struct UtxoPipeline<F> {
305 filter: Arc<ShortOutPointFilter>,
307 outputs: Arc<DashMap<ShortOutPoint, TxOut>>,
309 extract: F,
311}
312
313impl<F> UtxoPipeline<F> {
314 fn new(filter: ShortOutPointFilter, extract: F) -> Self {
316 Self {
317 filter: Arc::new(filter),
318 outputs: Arc::new(DashMap::new()),
319 extract,
320 }
321 }
322
323 fn status(&self, outpoint: &ShortOutPoint) -> OutputStatus {
325 if self.filter.contains(outpoint) {
326 OutputStatus::Unspent
327 } else {
328 OutputStatus::Spent
329 }
330 }
331}
332
333impl<F, T> Pipeline<UtxoBlock, UtxoBlock, T> for UtxoPipeline<F>
334where
335 F: Fn(UtxoBlock) -> T + Clone + Send + 'static,
336{
337 fn first(&self, mut block: UtxoBlock) -> UtxoBlock {
338 for tx in &mut block.txdata {
339 for (index, output) in tx.transaction.output.iter().enumerate() {
340 let outpoint = ShortOutPoint::new(index, &tx.txid);
341 let status = self.status(&outpoint);
342 if status != OutputStatus::Unspent {
344 self.outputs.insert(outpoint, output.clone());
345 }
346 tx.outputs.push(status);
347 }
348 }
349 block
350 }
351
352 fn second(&self, mut block: UtxoBlock) -> T {
353 for tx in &mut block.txdata {
354 for input in tx.transaction.input.iter() {
355 if tx.transaction.is_coinbase() {
356 tx.inputs.push(TxOut::NULL);
358 } else {
359 let outpoint = ShortOutPoint::from_outpoint(&input.previous_output);
360 let (_, out) = self.outputs.remove(&outpoint).expect("Missing outpoint");
361 tx.inputs.push(out);
362 }
363 }
364 }
365 (self.extract)(block)
366 }
367}
368
369#[derive(Eq, PartialEq, Hash, Debug, Clone)]
374struct ShortOutPoint(pub Vec<u8>);
375impl ShortOutPoint {
376 fn from_outpoint(outpoint: &OutPoint) -> ShortOutPoint {
378 Self::new(outpoint.vout as usize, &outpoint.txid)
379 }
380
381 fn new(vout: usize, txid: &Txid) -> ShortOutPoint {
383 let mut bytes = vec![];
384 bytes.extend_from_slice(&vout.to_le_bytes()[0..2]);
385 bytes.extend_from_slice(&txid.as_byte_array()[0..12]);
386 ShortOutPoint(bytes)
387 }
388}
389
390#[derive(Debug)]
392struct FastRng(SmallRng);
393impl Default for FastRng {
394 fn default() -> Self {
395 Self(SmallRng::seed_from_u64(0x2c76c58e13b3a812))
396 }
397}
398impl RngCore for FastRng {
399 fn next_u32(&mut self) -> u32 {
400 self.0.next_u32()
401 }
402
403 fn next_u64(&mut self) -> u64 {
404 self.0.next_u64()
405 }
406
407 fn fill_bytes(&mut self, dest: &mut [u8]) {
408 self.0.fill_bytes(dest)
409 }
410
411 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> {
412 self.0.try_fill_bytes(dest)
413 }
414}