use std::{collections::BTreeSet, fmt::Debug};
use bitcoin::{
consensus::{deserialize, serialize},
hashes::Hash,
Txid,
};
use bitcoin_slices::{bsl, Parse};
use log::*;
use rusqlite::OptionalExtension;
use crate::{
chain::{self, IndexedChain},
index::{self, ScriptHash},
Location,
};
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("rusqlite failed: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("invalid address: {0}")]
Address(#[from] bitcoin::address::ParseError),
#[error("chain error: {0}")]
Index(#[from] chain::Error),
}
pub struct Cache {
db: rusqlite::Connection,
}
impl Cache {
pub fn open(db: rusqlite::Connection) -> Result<Self, Error> {
db.execute("PRAGMA foreign_keys = ON", ())?;
let c = Cache { db };
c.run("create", || c.create_tables())?;
Ok(c)
}
fn run<T: Debug>(&self, op: &str, f: impl FnOnce() -> Result<T, Error>) -> Result<T, Error> {
let start = std::time::Instant::now();
self.db.execute("BEGIN", ())?;
let res = match f() {
Ok(v) => {
self.db.execute("COMMIT", ())?;
Ok(v)
}
Err(e) => {
self.db.execute("ROLLBACK", ())?;
Err(e)
}
};
debug!("DB {} took {:?}, result={:?}", op, start.elapsed(), res);
res
}
fn create_tables(&self) -> Result<(), Error> {
let statements = include_str!("schema.sql")
.split(";")
.map(str::trim)
.filter(|s| !str::is_empty(s));
for s in statements {
self.db.execute(s, ())?;
}
Ok(())
}
pub fn sync(&self, chain: &IndexedChain) -> Result<(), Error> {
self.run("sync", || {
let new_history = self.sync_history(chain)?;
let new_locations: BTreeSet<_> = new_history
.iter()
.flat_map(|(_, locations)| locations.iter())
.collect();
let new_headers: BTreeSet<_> = new_locations
.iter()
.map(|&loc| (loc.block_height, loc.indexed_header))
.collect();
if !new_locations.is_empty() {
info!(
"adding {} history entries, {} transactions, {} headers to cache={:?}",
new_history.len(),
new_locations.len(),
new_headers.len(),
self.db.path().unwrap_or("")
);
}
self.add_headers(new_headers.into_iter())?;
self.add_transactions(new_locations.into_iter(), chain)?;
self.add_history(new_history.into_iter())?;
Ok(())
})
}
fn drop_stale_blocks(&self, chain: &IndexedChain) -> Result<(), Error> {
let mut select = self
.db
.prepare("SELECT block_hash, block_height FROM headers ORDER BY block_height DESC")?;
let rows_iter = select.query_map((), |row| {
let hash = bitcoin::BlockHash::from_byte_array(row.get(0)?);
let height: usize = row.get(1)?;
Ok((hash, height))
})?;
let mut delete_from = None;
let headers = chain.headers();
for row in rows_iter {
let (hash, height) = row?;
match headers.get_header(hash, height) {
Ok(_header) => break,
Err(err) => {
warn!("reorg detected: {}", err);
delete_from = Some(height);
}
}
}
if let Some(height) = delete_from {
let mut delete = self
.db
.prepare("DELETE FROM headers WHERE block_height >= ?1")?;
delete.execute([height])?;
}
Ok(())
}
fn sync_history<'a>(
&self,
chain: &'a IndexedChain,
) -> Result<Vec<(ScriptHash, BTreeSet<Location<'a>>)>, Error> {
self.drop_stale_blocks(chain)?;
let mut stmt = self.db.prepare(
r"
WITH max_heights AS (
SELECT script_hash, max(block_height) AS `block_height`
FROM watch LEFT JOIN history USING (script_hash)
GROUP BY 1
)
SELECT script_hash, block_height, block_hash
FROM max_heights LEFT JOIN headers USING (block_height)",
)?;
let headers = chain.headers();
let rows_iter = stmt.query_map([], |row| {
let script_hash = ScriptHash::from_byte_array(row.get(0)?);
let block_height: Option<usize> = row.get(1)?;
let latest_header = if let Some(height) = block_height {
let block_hash = bitcoin::BlockHash::from_byte_array(row.get(2)?);
let header = headers
.get_header(block_hash, height)
.expect("unexpected stale block");
Some(header)
} else {
None
};
Ok((script_hash, latest_header))
})?;
rows_iter
.map(|row| {
let (script_hash, latest_header) = row?;
let locations = chain
.locations_by_scripthash(&script_hash, latest_header)?
.collect();
Ok((script_hash, locations))
})
.collect::<Result<_, Error>>()
}
fn add_headers<'a>(
&self,
entries: impl Iterator<Item = (usize, &'a index::IndexedHeader)>,
) -> Result<usize, Error> {
let mut insert = self
.db
.prepare("INSERT OR IGNORE INTO headers VALUES (?1, ?2, ?3)")?;
let mut rows = 0;
for (height, header) in entries {
rows += insert.execute((
height,
header.hash().as_byte_array(),
serialize(header.header()),
))?;
}
Ok(rows)
}
fn add_transactions<'a>(
&self,
locations: impl Iterator<Item = &'a Location<'a>>,
index: &IndexedChain,
) -> Result<usize, Error> {
let mut insert = self
.db
.prepare("INSERT OR IGNORE INTO transactions VALUES (?1, ?2, ?3, ?4)")?;
let mut rows = 0;
for loc in locations {
let tx_bytes = index.get_tx_bytes(loc).expect("missing tx bytes");
let parsed = bsl::Transaction::parse(&tx_bytes).expect("invalid tx");
let txid = Txid::from(parsed.parsed().txid()).to_byte_array();
rows += insert.execute((loc.block_height, loc.block_offset, txid, tx_bytes))?;
}
Ok(rows)
}
fn add_history<'a>(
&self,
entries: impl Iterator<Item = (ScriptHash, BTreeSet<Location<'a>>)>,
) -> Result<usize, Error> {
let mut insert = self
.db
.prepare("INSERT INTO history VALUES (?1, ?2, ?3, ?4, ?5, ?6)")?;
let mut rows = 0;
for (script_hash, locations) in entries {
for loc in locations {
let tx: bitcoin::Transaction = self.db.query_row(
"SELECT tx_bytes FROM transactions WHERE block_height = ?1 AND block_offset = ?2",
(loc.block_height, loc.block_offset),
|row| {
let tx_bytes: Vec<u8> = row.get(0)?;
Ok(deserialize(&tx_bytes).expect("invalid tx"))
},
)?;
for (i, input) in tx.input.iter().enumerate() {
let prevout = input.previous_output;
let result = self
.db
.query_row(
"SELECT block_height, block_offset FROM transactions WHERE tx_id = ?1",
[prevout.txid.as_byte_array()],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.optional()?;
let (height, offset): (usize, usize) = match result {
Some(v) => v,
None => continue,
};
let result: Option<i64> = self
.db
.query_row(
r"
SELECT amount
FROM history
WHERE script_hash = ?1
AND block_height = ?2
AND block_offset = ?3
AND is_output = TRUE
AND index_ = ?4",
(script_hash.as_byte_array(), height, offset, prevout.vout),
|row| row.get(0),
)
.optional()?;
if let Some(amount) = result {
assert!(amount > 0);
rows += insert.execute((
script_hash.as_byte_array(),
loc.block_height,
loc.block_offset,
false,
i,
-amount,
))?;
}
}
for (i, output) in tx.output.iter().enumerate() {
if script_hash == ScriptHash::new(&output.script_pubkey) {
rows += insert.execute((
script_hash.as_byte_array(),
loc.block_height,
loc.block_offset,
true,
i,
output.value.to_sat(),
))?;
}
}
}
}
Ok(rows)
}
pub fn add(&self, addresses: impl IntoIterator<Item = bitcoin::Address>) -> Result<(), Error> {
let mut insert = self
.db
.prepare("INSERT OR IGNORE INTO watch VALUES (?1, ?2, ?3)")?;
let mut rows = 0;
for addr in addresses {
let script = addr.script_pubkey();
let script_hash = ScriptHash::new(&script);
rows += insert.execute((
script_hash.as_byte_array(),
script.as_bytes(),
addr.to_string(),
))?;
}
if rows > 0 {
info!("added {} new addresses to watch", rows);
}
Ok(())
}
pub fn db(&self) -> &rusqlite::Connection {
&self.db
}
}