use async_trait::async_trait;
use chrono::Utc;
use sqlx::{Pool, Row, Sqlite, SqlitePool};
use std::sync::RwLock;
use tracing::{debug, info, warn};
use crate::chaintracks::{
calculate_work, BlockHeader, ChaintracksStorage, ChaintracksStorageIngest,
ChaintracksStorageQuery, HeightRange, InsertHeaderResult, LiveBlockHeader,
};
use crate::error::WalletResult;
use crate::types::Chain;
pub struct SqliteStorage {
pool: Pool<Sqlite>,
chain: Chain,
live_height_threshold: u32,
reorg_height_threshold: u32,
available: RwLock<bool>,
}
impl SqliteStorage {
pub async fn new(database_url: &str, chain: Chain) -> WalletResult<Self> {
let pool = SqlitePool::connect(database_url).await?;
Ok(Self {
pool,
chain,
live_height_threshold: 2000,
reorg_height_threshold: 400,
available: RwLock::new(false),
})
}
pub async fn with_thresholds(
database_url: &str,
chain: Chain,
live_height_threshold: u32,
reorg_height_threshold: u32,
) -> WalletResult<Self> {
let pool = SqlitePool::connect(database_url).await?;
Ok(Self {
pool,
chain,
live_height_threshold,
reorg_height_threshold,
available: RwLock::new(false),
})
}
pub async fn in_memory(chain: Chain) -> WalletResult<Self> {
Self::new("sqlite::memory:", chain).await
}
pub fn pool(&self) -> &Pool<Sqlite> {
&self.pool
}
async fn create_tables(&self) -> WalletResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS chaintracks_live_headers (
header_id INTEGER PRIMARY KEY AUTOINCREMENT,
previous_header_id INTEGER,
previous_hash TEXT NOT NULL,
height INTEGER NOT NULL,
is_active INTEGER NOT NULL DEFAULT 0,
is_chain_tip INTEGER NOT NULL DEFAULT 0,
hash TEXT NOT NULL UNIQUE,
chain_work TEXT NOT NULL,
version INTEGER NOT NULL,
merkle_root TEXT NOT NULL,
time INTEGER NOT NULL,
bits INTEGER NOT NULL,
nonce INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (previous_header_id) REFERENCES chaintracks_live_headers(header_id)
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_live_headers_height ON chaintracks_live_headers(height)",
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_live_headers_active ON chaintracks_live_headers(is_active)",
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_live_headers_tip ON chaintracks_live_headers(is_chain_tip)",
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_live_headers_merkle ON chaintracks_live_headers(merkle_root) WHERE is_active = 1",
)
.execute(&self.pool)
.await?;
Ok(())
}
fn row_to_header(row: &sqlx::sqlite::SqliteRow) -> LiveBlockHeader {
let header_id: i64 = row.get("header_id");
let previous_header_id: Option<i64> = row.get("previous_header_id");
LiveBlockHeader {
header_id: Some(header_id as u64),
previous_header_id: previous_header_id.map(|v| v as u64),
previous_hash: row.get("previous_hash"),
height: row.get::<i64, _>("height") as u32,
is_active: row.get::<i32, _>("is_active") != 0,
is_chain_tip: row.get::<i32, _>("is_chain_tip") != 0,
hash: row.get("hash"),
chain_work: row.get("chain_work"),
version: row.get::<i64, _>("version") as u32,
merkle_root: row.get("merkle_root"),
time: row.get::<i64, _>("time") as u32,
bits: row.get::<i64, _>("bits") as u32,
nonce: row.get::<i64, _>("nonce") as u32,
}
}
async fn get_tip(&self) -> WalletResult<Option<LiveBlockHeader>> {
let row = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE is_chain_tip = 1
LIMIT 1
"#,
)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| Self::row_to_header(&r)))
}
async fn set_tip(&self, header_id: i64) -> WalletResult<()> {
sqlx::query("UPDATE chaintracks_live_headers SET is_chain_tip = 0 WHERE is_chain_tip = 1")
.execute(&self.pool)
.await?;
sqlx::query("UPDATE chaintracks_live_headers SET is_chain_tip = 1 WHERE header_id = ?")
.bind(header_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn handle_reorg(
&self,
new_tip: &LiveBlockHeader,
old_tip: &LiveBlockHeader,
) -> WalletResult<Vec<LiveBlockHeader>> {
let mut deactivated = Vec::new();
let ancestor = self.find_common_ancestor(new_tip, old_tip).await?;
let ancestor_height = ancestor.as_ref().map(|a| a.height).unwrap_or(0);
let old_chain_rows = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE is_active = 1 AND height > ?
ORDER BY height DESC
"#,
)
.bind(ancestor_height as i64)
.fetch_all(&self.pool)
.await?;
for row in old_chain_rows {
let header = Self::row_to_header(&row);
deactivated.push(header.clone());
sqlx::query("UPDATE chaintracks_live_headers SET is_active = 0 WHERE header_id = ?")
.bind(header.header_id.map(|v| v as i64))
.execute(&self.pool)
.await?;
}
let mut current = Some(new_tip.clone());
while let Some(header) = current {
if header.height <= ancestor_height {
break;
}
sqlx::query("UPDATE chaintracks_live_headers SET is_active = 1 WHERE header_id = ?")
.bind(header.header_id.map(|v| v as i64))
.execute(&self.pool)
.await?;
if let Some(prev_id) = header.previous_header_id {
let row = sqlx::query("SELECT * FROM chaintracks_live_headers WHERE header_id = ?")
.bind(prev_id as i64)
.fetch_optional(&self.pool)
.await?;
current = row.map(|r| Self::row_to_header(&r));
} else {
current = None;
}
}
info!(
"Reorg handled: deactivated {} headers, new tip at height {}",
deactivated.len(),
new_tip.height
);
Ok(deactivated)
}
pub async fn header_count(&self) -> WalletResult<usize> {
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM chaintracks_live_headers")
.fetch_one(&self.pool)
.await?;
Ok(row.0 as usize)
}
pub async fn live_header_exists(&self, hash: &str) -> WalletResult<bool> {
let row: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM chaintracks_live_headers WHERE hash = ? LIMIT 1")
.bind(hash)
.fetch_one(&self.pool)
.await?;
Ok(row.0 > 0)
}
pub async fn find_headers_for_height_less_than_or_equal_sorted(
&self,
height: u32,
limit: u32,
) -> WalletResult<Vec<LiveBlockHeader>> {
let rows = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE height <= ?
ORDER BY height ASC
LIMIT ?
"#,
)
.bind(height as i64)
.bind(limit as i64)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(Self::row_to_header).collect())
}
pub async fn delete_live_headers_by_ids(&self, ids: &[u64]) -> WalletResult<u32> {
if ids.is_empty() {
return Ok(0);
}
let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let clear_refs_sql = format!(
"UPDATE chaintracks_live_headers SET previous_header_id = NULL WHERE previous_header_id IN ({})",
placeholders
);
let mut clear_query = sqlx::query(&clear_refs_sql);
for id in ids {
clear_query = clear_query.bind(*id as i64);
}
clear_query.execute(&self.pool).await?;
let delete_sql = format!(
"DELETE FROM chaintracks_live_headers WHERE header_id IN ({})",
placeholders
);
let mut delete_query = sqlx::query(&delete_sql);
for id in ids {
delete_query = delete_query.bind(*id as i64);
}
let result = delete_query.execute(&self.pool).await?;
let count = result.rows_affected() as u32;
if count > 0 {
debug!("Deleted {} headers by IDs", count);
}
Ok(count)
}
pub async fn set_chain_tip_by_id(
&self,
header_id: u64,
is_chain_tip: bool,
) -> WalletResult<()> {
sqlx::query(
"UPDATE chaintracks_live_headers SET is_chain_tip = ?, updated_at = ? WHERE header_id = ?",
)
.bind(if is_chain_tip { 1 } else { 0 })
.bind(Utc::now().to_rfc3339())
.bind(header_id as i64)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn set_active_by_id(&self, header_id: u64, is_active: bool) -> WalletResult<()> {
sqlx::query(
"UPDATE chaintracks_live_headers SET is_active = ?, updated_at = ? WHERE header_id = ?",
)
.bind(if is_active { 1 } else { 0 })
.bind(Utc::now().to_rfc3339())
.bind(header_id as i64)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn insert_headers_batch(&self, headers: &[LiveBlockHeader]) -> WalletResult<u32> {
if headers.is_empty() {
return Ok(0);
}
let mut inserted = 0u32;
let now = Utc::now().to_rfc3339();
let mut tx = self.pool.begin().await?;
let chunk_size = 500;
let mut existing_hashes = std::collections::HashSet::new();
for chunk in headers.chunks(chunk_size) {
let placeholders: String = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"SELECT hash FROM chaintracks_live_headers WHERE hash IN ({})",
placeholders
);
let mut query = sqlx::query_scalar::<_, String>(&sql);
for h in chunk {
query = query.bind(&h.hash);
}
let existing: Vec<String> = query.fetch_all(&mut *tx).await?;
existing_hashes.extend(existing);
}
for header in headers {
if existing_hashes.contains(&header.hash) {
continue;
}
let chain_work = if header.chain_work.is_empty() || header.chain_work == "0" {
calculate_work(header.bits)
} else {
header.chain_work.clone()
};
let previous_header_id: Option<i64> = if header.previous_hash != "0".repeat(64) {
let row: Option<(i64,)> =
sqlx::query_as("SELECT header_id FROM chaintracks_live_headers WHERE hash = ?")
.bind(&header.previous_hash)
.fetch_optional(&mut *tx)
.await?;
row.map(|r| r.0)
} else {
None
};
sqlx::query(
r#"
INSERT INTO chaintracks_live_headers (
previous_header_id, previous_hash, height, is_active, is_chain_tip,
hash, chain_work, version, merkle_root, time, bits, nonce,
created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(previous_header_id)
.bind(&header.previous_hash)
.bind(header.height as i64)
.bind(1i32) .bind(0i32) .bind(&header.hash)
.bind(&chain_work)
.bind(header.version as i64)
.bind(&header.merkle_root)
.bind(header.time as i64)
.bind(header.bits as i64)
.bind(header.nonce as i64)
.bind(&now)
.bind(&now)
.execute(&mut *tx)
.await?;
inserted += 1;
}
tx.commit().await?;
if inserted > 0 {
info!("Batch inserted {} headers", inserted);
}
Ok(inserted)
}
pub async fn update_chain_tip_to_highest(&self) -> WalletResult<Option<LiveBlockHeader>> {
sqlx::query("UPDATE chaintracks_live_headers SET is_chain_tip = 0 WHERE is_chain_tip = 1")
.execute(&self.pool)
.await?;
let row = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE is_active = 1
ORDER BY height DESC
LIMIT 1
"#,
)
.fetch_optional(&self.pool)
.await?;
match row {
Some(r) => {
let header = Self::row_to_header(&r);
sqlx::query(
"UPDATE chaintracks_live_headers SET is_chain_tip = 1, updated_at = ? WHERE header_id = ?",
)
.bind(Utc::now().to_rfc3339())
.bind(header.header_id.map(|v| v as i64))
.execute(&self.pool)
.await?;
debug!(
"Updated chain tip to height {} hash {}",
header.height,
&header.hash[..header.hash.len().min(16)]
);
Ok(Some(header))
}
None => Ok(None),
}
}
pub async fn get_headers_by_height_range(
&self,
start_height: u32,
end_height: u32,
) -> WalletResult<Vec<LiveBlockHeader>> {
let rows = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE height >= ? AND height <= ? AND is_active = 1
ORDER BY height ASC
"#,
)
.bind(start_height as i64)
.bind(end_height as i64)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(Self::row_to_header).collect())
}
pub async fn get_headers_at_height(&self, height: u32) -> WalletResult<Vec<LiveBlockHeader>> {
let rows = sqlx::query("SELECT * FROM chaintracks_live_headers WHERE height = ?")
.bind(height as i64)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(Self::row_to_header).collect())
}
pub async fn get_active_headers(&self) -> WalletResult<Vec<LiveBlockHeader>> {
let rows = sqlx::query(
"SELECT * FROM chaintracks_live_headers WHERE is_active = 1 ORDER BY height ASC",
)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(Self::row_to_header).collect())
}
pub async fn get_fork_headers(&self) -> WalletResult<Vec<LiveBlockHeader>> {
let rows = sqlx::query(
"SELECT * FROM chaintracks_live_headers WHERE is_active = 0 ORDER BY height ASC",
)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(Self::row_to_header).collect())
}
pub async fn find_children(&self, parent_hash: &str) -> WalletResult<Vec<LiveBlockHeader>> {
let rows = sqlx::query("SELECT * FROM chaintracks_live_headers WHERE previous_hash = ?")
.bind(parent_hash)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(Self::row_to_header).collect())
}
pub async fn mark_headers_inactive_above_height(&self, height: u32) -> WalletResult<u32> {
let result = sqlx::query(
"UPDATE chaintracks_live_headers SET is_active = 0, is_chain_tip = 0, updated_at = ? WHERE height >= ? AND is_active = 1",
)
.bind(Utc::now().to_rfc3339())
.bind(height as i64)
.execute(&self.pool)
.await?;
let count = result.rows_affected() as u32;
if count > 0 {
info!("Marked {} headers inactive above height {}", count, height);
}
Ok(count)
}
}
#[async_trait]
impl ChaintracksStorageQuery for SqliteStorage {
fn chain(&self) -> Chain {
self.chain.clone()
}
fn live_height_threshold(&self) -> u32 {
self.live_height_threshold
}
fn reorg_height_threshold(&self) -> u32 {
self.reorg_height_threshold
}
async fn find_chain_tip_header(&self) -> WalletResult<Option<LiveBlockHeader>> {
self.get_tip().await
}
async fn find_chain_tip_hash(&self) -> WalletResult<Option<String>> {
Ok(self.get_tip().await?.map(|h| h.hash))
}
async fn find_header_for_height(&self, height: u32) -> WalletResult<Option<BlockHeader>> {
let row = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE height = ? AND is_active = 1
LIMIT 1
"#,
)
.bind(height as i64)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| Self::row_to_header(&r).into()))
}
async fn find_live_header_for_block_hash(
&self,
hash: &str,
) -> WalletResult<Option<LiveBlockHeader>> {
let row = sqlx::query("SELECT * FROM chaintracks_live_headers WHERE hash = ?")
.bind(hash)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| Self::row_to_header(&r)))
}
async fn find_live_header_for_merkle_root(
&self,
merkle_root: &str,
) -> WalletResult<Option<LiveBlockHeader>> {
let row = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE merkle_root = ? AND is_active = 1
LIMIT 1
"#,
)
.bind(merkle_root)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| Self::row_to_header(&r)))
}
async fn get_headers_bytes(&self, height: u32, count: u32) -> WalletResult<Vec<u8>> {
let rows = sqlx::query(
r#"
SELECT * FROM chaintracks_live_headers
WHERE height >= ? AND height < ? AND is_active = 1
ORDER BY height ASC
"#,
)
.bind(height as i64)
.bind((height + count) as i64)
.fetch_all(&self.pool)
.await?;
let mut bytes = Vec::with_capacity(rows.len() * 80);
for row in rows {
let header = Self::row_to_header(&row);
bytes.extend_from_slice(&header.version.to_le_bytes());
if let Ok(prev) = hex::decode(&header.previous_hash) {
if prev.len() == 32 {
bytes.extend_from_slice(&prev);
} else {
bytes.extend_from_slice(&[0u8; 32]);
}
} else {
bytes.extend_from_slice(&[0u8; 32]);
}
if let Ok(merkle) = hex::decode(&header.merkle_root) {
if merkle.len() == 32 {
bytes.extend_from_slice(&merkle);
} else {
bytes.extend_from_slice(&[0u8; 32]);
}
} else {
bytes.extend_from_slice(&[0u8; 32]);
}
bytes.extend_from_slice(&header.time.to_le_bytes());
bytes.extend_from_slice(&header.bits.to_le_bytes());
bytes.extend_from_slice(&header.nonce.to_le_bytes());
}
Ok(bytes)
}
async fn get_live_headers(&self) -> WalletResult<Vec<LiveBlockHeader>> {
let rows = sqlx::query("SELECT * FROM chaintracks_live_headers ORDER BY height DESC")
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(Self::row_to_header).collect())
}
async fn get_available_height_ranges(&self) -> WalletResult<Vec<HeightRange>> {
Ok(vec![])
}
async fn find_live_height_range(&self) -> WalletResult<Option<HeightRange>> {
let row: Option<(Option<i64>, Option<i64>)> = sqlx::query_as(
r#"
SELECT MIN(height), MAX(height)
FROM chaintracks_live_headers
WHERE is_active = 1
"#,
)
.fetch_optional(&self.pool)
.await?;
match row {
Some((Some(min), Some(max))) => Ok(Some(HeightRange::new(min as u32, max as u32))),
_ => Ok(None),
}
}
async fn find_common_ancestor(
&self,
header1: &LiveBlockHeader,
header2: &LiveBlockHeader,
) -> WalletResult<Option<LiveBlockHeader>> {
let mut h1 = Some(header1.clone());
let mut h2 = Some(header2.clone());
while let (Some(ref a), Some(ref b)) = (&h1, &h2) {
if a.hash == b.hash {
return Ok(h1);
}
match a.height.cmp(&b.height) {
std::cmp::Ordering::Greater => {
h1 = if let Some(prev_id) = a.previous_header_id {
let row = sqlx::query(
"SELECT * FROM chaintracks_live_headers WHERE header_id = ?",
)
.bind(prev_id as i64)
.fetch_optional(&self.pool)
.await?;
row.map(|r| Self::row_to_header(&r))
} else {
None
};
}
std::cmp::Ordering::Less => {
h2 = if let Some(prev_id) = b.previous_header_id {
let row = sqlx::query(
"SELECT * FROM chaintracks_live_headers WHERE header_id = ?",
)
.bind(prev_id as i64)
.fetch_optional(&self.pool)
.await?;
row.map(|r| Self::row_to_header(&r))
} else {
None
};
}
std::cmp::Ordering::Equal => {
h1 = if let Some(prev_id) = a.previous_header_id {
let row = sqlx::query(
"SELECT * FROM chaintracks_live_headers WHERE header_id = ?",
)
.bind(prev_id as i64)
.fetch_optional(&self.pool)
.await?;
row.map(|r| Self::row_to_header(&r))
} else {
None
};
h2 = if let Some(prev_id) = b.previous_header_id {
let row = sqlx::query(
"SELECT * FROM chaintracks_live_headers WHERE header_id = ?",
)
.bind(prev_id as i64)
.fetch_optional(&self.pool)
.await?;
row.map(|r| Self::row_to_header(&r))
} else {
None
};
}
}
}
Ok(None)
}
async fn find_reorg_depth(&self, new_header: &LiveBlockHeader) -> WalletResult<u32> {
let tip = self.get_tip().await?;
match tip {
None => Ok(0),
Some(current_tip) => {
if new_header.previous_hash == current_tip.hash {
Ok(0)
} else {
let ancestor = self.find_common_ancestor(new_header, ¤t_tip).await?;
match ancestor {
Some(a) => Ok(current_tip.height - a.height),
None => Ok(current_tip.height),
}
}
}
}
}
}
#[async_trait]
impl ChaintracksStorageIngest for SqliteStorage {
async fn insert_header(&self, mut header: LiveBlockHeader) -> WalletResult<InsertHeaderResult> {
let existing = self.find_live_header_for_block_hash(&header.hash).await?;
if existing.is_some() {
return Ok(InsertHeaderResult {
added: false,
dupe: true,
..Default::default()
});
}
if header.chain_work.is_empty() || header.chain_work == "0" {
header.chain_work = calculate_work(header.bits);
}
let previous_header = if header.previous_hash != "0".repeat(64) {
self.find_live_header_for_block_hash(&header.previous_hash)
.await?
} else {
None
};
let previous_header_id_i64: Option<i64> = previous_header
.as_ref()
.and_then(|h| h.header_id.map(|v| v as i64));
let current_tip = self.get_tip().await?;
let becomes_tip = match ¤t_tip {
None => true,
Some(tip) => header.height > tip.height,
};
let now = Utc::now().to_rfc3339();
let result = sqlx::query(
r#"
INSERT INTO chaintracks_live_headers (
previous_header_id, previous_hash, height, is_active, is_chain_tip,
hash, chain_work, version, merkle_root, time, bits, nonce,
created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(previous_header_id_i64)
.bind(&header.previous_hash)
.bind(header.height as i64)
.bind(if becomes_tip { 1 } else { 0 })
.bind(if becomes_tip { 1 } else { 0 })
.bind(&header.hash)
.bind(&header.chain_work)
.bind(header.version as i64)
.bind(&header.merkle_root)
.bind(header.time as i64)
.bind(header.bits as i64)
.bind(header.nonce as i64)
.bind(&now)
.bind(&now)
.execute(&self.pool)
.await?;
let header_id = result.last_insert_rowid();
header.header_id = Some(header_id as u64);
header.previous_header_id = previous_header_id_i64.map(|v| v as u64);
let mut insert_result = InsertHeaderResult {
added: true,
no_prev: previous_header.is_none() && header.height > 0,
no_tip: current_tip.is_none(),
is_active_tip: becomes_tip,
..Default::default()
};
if becomes_tip {
if let Some(ref tip) = current_tip {
if header.previous_hash != tip.hash {
let deactivated = self.handle_reorg(&header, tip).await?;
insert_result.reorg_depth = deactivated.len() as u32;
insert_result.deactivated_headers =
deactivated.into_iter().map(BlockHeader::from).collect();
insert_result.prior_tip = Some(BlockHeader::from(tip.clone()));
}
}
self.set_tip(header_id).await?;
sqlx::query("UPDATE chaintracks_live_headers SET is_active = 1 WHERE header_id = ?")
.bind(header_id)
.execute(&self.pool)
.await?;
}
debug!(
"Inserted header at height {} with hash {}",
header.height,
&header.hash[..header.hash.len().min(16)]
);
Ok(insert_result)
}
async fn prune_live_block_headers(&self, active_tip_height: u32) -> WalletResult<u32> {
let threshold = active_tip_height.saturating_sub(self.live_height_threshold);
sqlx::query(
r#"
UPDATE chaintracks_live_headers
SET previous_header_id = NULL
WHERE previous_header_id IN (
SELECT header_id FROM chaintracks_live_headers
WHERE is_active = 0 AND height < ?
)
"#,
)
.bind(threshold as i64)
.execute(&self.pool)
.await?;
let result = sqlx::query(
r#"
DELETE FROM chaintracks_live_headers
WHERE is_active = 0 AND height < ?
"#,
)
.bind(threshold as i64)
.execute(&self.pool)
.await?;
let count = result.rows_affected() as u32;
if count > 0 {
info!(
"Pruned {} inactive headers below height {}",
count, threshold
);
}
Ok(count)
}
async fn migrate_live_to_bulk(&self, _count: u32) -> WalletResult<u32> {
Ok(0)
}
async fn delete_older_live_block_headers(&self, max_height: u32) -> WalletResult<u32> {
sqlx::query(
r#"
UPDATE chaintracks_live_headers
SET previous_header_id = NULL
WHERE previous_header_id IN (
SELECT header_id FROM chaintracks_live_headers
WHERE height <= ?
)
"#,
)
.bind(max_height as i64)
.execute(&self.pool)
.await?;
let result = sqlx::query("DELETE FROM chaintracks_live_headers WHERE height <= ?")
.bind(max_height as i64)
.execute(&self.pool)
.await?;
let count = result.rows_affected() as u32;
if count > 0 {
warn!(
"Deleted {} headers at or below height {}",
count, max_height
);
}
Ok(count)
}
async fn make_available(&self) -> WalletResult<()> {
let mut available = self.available.write().unwrap();
*available = true;
Ok(())
}
async fn migrate_latest(&self) -> WalletResult<()> {
self.create_tables().await
}
async fn drop_all_data(&self) -> WalletResult<()> {
sqlx::query("DELETE FROM chaintracks_live_headers")
.execute(&self.pool)
.await?;
Ok(())
}
async fn destroy(&self) -> WalletResult<()> {
self.drop_all_data().await
}
}
#[async_trait]
impl ChaintracksStorage for SqliteStorage {
fn storage_type(&self) -> &str {
"sqlite"
}
async fn is_available(&self) -> bool {
*self.available.read().unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn create_test_storage() -> SqliteStorage {
let storage = SqliteStorage::in_memory(Chain::Test).await.unwrap();
storage.migrate_latest().await.unwrap();
storage.make_available().await.unwrap();
storage
}
fn create_test_header(height: u32, prev_hash: &str, hash: &str) -> LiveBlockHeader {
LiveBlockHeader {
header_id: None,
previous_header_id: None,
previous_hash: prev_hash.to_string(),
height,
is_active: false,
is_chain_tip: false,
hash: hash.to_string(),
chain_work: "".to_string(),
version: 1,
merkle_root: format!("merkle_{}", hash),
time: 1234567890 + height,
bits: 0x1d00ffff,
nonce: 12345,
}
}
#[tokio::test]
async fn test_storage_type() {
let storage = create_test_storage().await;
assert_eq!(storage.storage_type(), "sqlite");
}
#[tokio::test]
async fn test_is_available() {
let storage = create_test_storage().await;
assert!(storage.is_available().await);
}
#[tokio::test]
async fn test_insert_header() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
let result = storage.insert_header(header).await.unwrap();
assert!(result.added);
assert!(!result.dupe);
assert!(result.is_active_tip);
assert!(result.no_tip);
}
#[tokio::test]
async fn test_duplicate_detection() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header.clone()).await.unwrap();
let result = storage.insert_header(header).await.unwrap();
assert!(!result.added);
assert!(result.dupe);
}
#[tokio::test]
async fn test_find_by_hash() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
let found = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().height, 0);
}
#[tokio::test]
async fn test_find_by_height() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
let found = storage.find_header_for_height(0).await.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().hash, "hash_0");
}
#[tokio::test]
async fn test_chain_growth() {
let storage = create_test_storage().await;
let genesis = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(genesis).await.unwrap();
let block1 = create_test_header(1, "hash_0", "hash_1");
let result = storage.insert_header(block1).await.unwrap();
assert!(result.added);
assert!(result.is_active_tip);
assert_eq!(result.reorg_depth, 0);
let tip = storage.find_chain_tip_header().await.unwrap().unwrap();
assert_eq!(tip.height, 1);
assert_eq!(tip.hash, "hash_1");
}
#[tokio::test]
async fn test_find_merkle_root() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
let found = storage
.find_live_header_for_merkle_root("merkle_hash_0")
.await
.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().hash, "hash_0");
}
#[tokio::test]
async fn test_prune_inactive() {
let storage = create_test_storage().await;
let genesis = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(genesis).await.unwrap();
let block1 = create_test_header(1, "hash_0", "hash_1");
storage.insert_header(block1).await.unwrap();
sqlx::query("UPDATE chaintracks_live_headers SET is_active = 0 WHERE hash = 'hash_0'")
.execute(storage.pool())
.await
.unwrap();
let pruned = storage.prune_live_block_headers(2002).await.unwrap();
assert_eq!(pruned, 1);
}
#[tokio::test]
async fn test_drop_all_data() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
assert_eq!(storage.header_count().await.unwrap(), 1);
storage.drop_all_data().await.unwrap();
assert_eq!(storage.header_count().await.unwrap(), 0);
}
#[tokio::test]
async fn test_find_live_height_range() {
let storage = create_test_storage().await;
let range = storage.find_live_height_range().await.unwrap();
assert!(range.is_none());
let genesis = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(genesis).await.unwrap();
let block1 = create_test_header(1, "hash_0", "hash_1");
storage.insert_header(block1).await.unwrap();
let block2 = create_test_header(2, "hash_1", "hash_2");
storage.insert_header(block2).await.unwrap();
let range = storage.find_live_height_range().await.unwrap().unwrap();
assert_eq!(range.low, 0);
assert_eq!(range.high, 2);
}
#[tokio::test]
async fn test_get_headers_bytes() {
let storage = create_test_storage().await;
let genesis = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(genesis).await.unwrap();
let bytes = storage.get_headers_bytes(0, 1).await.unwrap();
assert_eq!(bytes.len(), 80);
}
#[tokio::test]
async fn test_live_header_exists() {
let storage = create_test_storage().await;
assert!(!storage.live_header_exists("hash_0").await.unwrap());
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
assert!(storage.live_header_exists("hash_0").await.unwrap());
assert!(!storage.live_header_exists("nonexistent").await.unwrap());
}
#[tokio::test]
async fn test_find_headers_for_height_less_than_or_equal_sorted() {
let storage = create_test_storage().await;
for i in 0..5 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let headers = storage
.find_headers_for_height_less_than_or_equal_sorted(2, 10)
.await
.unwrap();
assert_eq!(headers.len(), 3); assert_eq!(headers[0].height, 0);
assert_eq!(headers[1].height, 1);
assert_eq!(headers[2].height, 2);
}
#[tokio::test]
async fn test_find_headers_with_limit() {
let storage = create_test_storage().await;
for i in 0..10 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let headers = storage
.find_headers_for_height_less_than_or_equal_sorted(9, 3)
.await
.unwrap();
assert_eq!(headers.len(), 3);
}
#[tokio::test]
async fn test_delete_live_headers_by_ids() {
let storage = create_test_storage().await;
let h0 = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(h0).await.unwrap();
let h1 = create_test_header(1, "hash_0", "hash_1");
storage.insert_header(h1).await.unwrap();
let h2 = create_test_header(2, "hash_1", "hash_2");
storage.insert_header(h2).await.unwrap();
assert_eq!(storage.header_count().await.unwrap(), 3);
let header0 = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap()
.unwrap();
let header1 = storage
.find_live_header_for_block_hash("hash_1")
.await
.unwrap()
.unwrap();
let deleted = storage
.delete_live_headers_by_ids(&[header0.header_id.unwrap(), header1.header_id.unwrap()])
.await
.unwrap();
assert_eq!(deleted, 2);
assert_eq!(storage.header_count().await.unwrap(), 1);
let remaining = storage
.find_live_header_for_block_hash("hash_2")
.await
.unwrap();
assert!(remaining.is_some());
}
#[tokio::test]
async fn test_delete_empty_ids() {
let storage = create_test_storage().await;
let deleted = storage.delete_live_headers_by_ids(&[]).await.unwrap();
assert_eq!(deleted, 0);
}
#[tokio::test]
async fn test_set_chain_tip_by_id() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
let h = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap()
.unwrap();
assert!(h.is_chain_tip);
storage
.set_chain_tip_by_id(h.header_id.unwrap(), false)
.await
.unwrap();
let h = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap()
.unwrap();
assert!(!h.is_chain_tip);
storage
.set_chain_tip_by_id(h.header_id.unwrap(), true)
.await
.unwrap();
let h = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap()
.unwrap();
assert!(h.is_chain_tip);
}
#[tokio::test]
async fn test_set_active_by_id() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
let h = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap()
.unwrap();
assert!(h.is_active);
storage
.set_active_by_id(h.header_id.unwrap(), false)
.await
.unwrap();
let h = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap()
.unwrap();
assert!(!h.is_active);
}
#[tokio::test]
async fn test_batch_insert() {
let storage = create_test_storage().await;
let mut headers = Vec::new();
for i in 0..100 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{:04}", i - 1)
};
headers.push(create_test_header(i, &prev_hash, &format!("hash_{:04}", i)));
}
let inserted = storage.insert_headers_batch(&headers).await.unwrap();
assert_eq!(inserted, 100);
assert_eq!(storage.header_count().await.unwrap(), 100);
let tip = storage
.update_chain_tip_to_highest()
.await
.unwrap()
.unwrap();
assert_eq!(tip.height, 99);
}
#[tokio::test]
async fn test_batch_insert_with_duplicates() {
let storage = create_test_storage().await;
let h0 = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(h0).await.unwrap();
let h1 = create_test_header(1, "hash_0", "hash_1");
storage.insert_header(h1).await.unwrap();
let headers = vec![
create_test_header(0, &"0".repeat(64), "hash_0"), create_test_header(1, "hash_0", "hash_1"), create_test_header(2, "hash_1", "hash_2"), create_test_header(3, "hash_2", "hash_3"), ];
let inserted = storage.insert_headers_batch(&headers).await.unwrap();
assert_eq!(inserted, 2); assert_eq!(storage.header_count().await.unwrap(), 4);
}
#[tokio::test]
async fn test_batch_insert_empty() {
let storage = create_test_storage().await;
let inserted = storage.insert_headers_batch(&[]).await.unwrap();
assert_eq!(inserted, 0);
}
#[tokio::test]
async fn test_update_chain_tip_to_highest() {
let storage = create_test_storage().await;
let headers = vec![
create_test_header(0, &"0".repeat(64), "hash_0"),
create_test_header(1, "hash_0", "hash_1"),
create_test_header(2, "hash_1", "hash_2"),
];
storage.insert_headers_batch(&headers).await.unwrap();
let tip = storage
.update_chain_tip_to_highest()
.await
.unwrap()
.unwrap();
assert_eq!(tip.height, 2);
assert_eq!(tip.hash, "hash_2");
let fetched_tip = storage.find_chain_tip_header().await.unwrap().unwrap();
assert_eq!(fetched_tip.height, 2);
}
#[tokio::test]
async fn test_update_chain_tip_empty_storage() {
let storage = create_test_storage().await;
let tip = storage.update_chain_tip_to_highest().await.unwrap();
assert!(tip.is_none());
}
#[tokio::test]
async fn test_get_headers_by_height_range() {
let storage = create_test_storage().await;
for i in 0..10 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let headers = storage.get_headers_by_height_range(3, 7).await.unwrap();
assert_eq!(headers.len(), 5);
assert_eq!(headers[0].height, 3);
assert_eq!(headers[4].height, 7);
}
#[tokio::test]
async fn test_get_headers_at_height() {
let storage = create_test_storage().await;
let h0 = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(h0).await.unwrap();
let headers = storage.get_headers_at_height(0).await.unwrap();
assert_eq!(headers.len(), 1);
let headers = storage.get_headers_at_height(1).await.unwrap();
assert!(headers.is_empty());
}
#[tokio::test]
async fn test_get_active_headers() {
let storage = create_test_storage().await;
for i in 0..3 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let h1 = storage
.find_live_header_for_block_hash("hash_1")
.await
.unwrap()
.unwrap();
storage
.set_active_by_id(h1.header_id.unwrap(), false)
.await
.unwrap();
let active = storage.get_active_headers().await.unwrap();
assert_eq!(active.len(), 2);
}
#[tokio::test]
async fn test_get_fork_headers() {
let storage = create_test_storage().await;
for i in 0..3 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let forks = storage.get_fork_headers().await.unwrap();
assert!(forks.is_empty());
let h1 = storage
.find_live_header_for_block_hash("hash_1")
.await
.unwrap()
.unwrap();
storage
.set_active_by_id(h1.header_id.unwrap(), false)
.await
.unwrap();
let forks = storage.get_fork_headers().await.unwrap();
assert_eq!(forks.len(), 1);
assert_eq!(forks[0].hash, "hash_1");
}
#[tokio::test]
async fn test_find_children() {
let storage = create_test_storage().await;
let h0 = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(h0).await.unwrap();
let h1 = create_test_header(1, "hash_0", "hash_1");
storage.insert_header(h1).await.unwrap();
let children = storage.find_children("hash_0").await.unwrap();
assert_eq!(children.len(), 1);
assert_eq!(children[0].hash, "hash_1");
let no_children = storage.find_children("hash_1").await.unwrap();
assert!(no_children.is_empty());
}
#[tokio::test]
async fn test_mark_headers_inactive_above_height() {
let storage = create_test_storage().await;
for i in 0..5 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let marked = storage.mark_headers_inactive_above_height(3).await.unwrap();
assert_eq!(marked, 2);
let h2 = storage
.find_live_header_for_block_hash("hash_2")
.await
.unwrap()
.unwrap();
assert!(h2.is_active);
let h3 = storage
.find_live_header_for_block_hash("hash_3")
.await
.unwrap()
.unwrap();
assert!(!h3.is_active);
let h4 = storage
.find_live_header_for_block_hash("hash_4")
.await
.unwrap()
.unwrap();
assert!(!h4.is_active);
}
#[tokio::test]
async fn test_reorg_handling() {
let storage = create_test_storage().await;
let h0 = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(h0).await.unwrap();
let h1 = create_test_header(1, "hash_0", "hash_1");
storage.insert_header(h1).await.unwrap();
let h2 = create_test_header(2, "hash_1", "hash_2");
storage.insert_header(h2).await.unwrap();
let tip = storage.find_chain_tip_header().await.unwrap().unwrap();
assert_eq!(tip.height, 2);
let fork1 = create_test_header(1, "hash_0", "fork_hash_1");
let fork2 = create_test_header(2, "fork_hash_1", "fork_hash_2");
let fork3 = create_test_header(3, "fork_hash_2", "fork_hash_3");
storage.insert_header(fork1).await.unwrap();
storage.insert_header(fork2).await.unwrap();
let result = storage.insert_header(fork3).await.unwrap();
assert!(result.added);
assert!(result.is_active_tip);
assert!(result.reorg_depth > 0);
let new_tip = storage.find_chain_tip_header().await.unwrap().unwrap();
assert_eq!(new_tip.height, 3);
assert_eq!(new_tip.hash, "fork_hash_3");
}
#[tokio::test]
async fn test_empty_database_queries() {
let storage = create_test_storage().await;
assert!(storage.find_chain_tip_header().await.unwrap().is_none());
assert!(storage.find_chain_tip_hash().await.unwrap().is_none());
assert!(storage.find_header_for_height(0).await.unwrap().is_none());
assert!(storage
.find_live_header_for_block_hash("any")
.await
.unwrap()
.is_none());
assert!(storage
.find_live_header_for_merkle_root("any")
.await
.unwrap()
.is_none());
assert!(storage.get_headers_bytes(0, 10).await.unwrap().is_empty());
assert!(storage.get_live_headers().await.unwrap().is_empty());
assert!(storage.find_live_height_range().await.unwrap().is_none());
assert_eq!(storage.header_count().await.unwrap(), 0);
assert!(!storage.live_header_exists("any").await.unwrap());
assert!(storage
.get_headers_by_height_range(0, 10)
.await
.unwrap()
.is_empty());
assert!(storage.get_headers_at_height(0).await.unwrap().is_empty());
assert!(storage.get_active_headers().await.unwrap().is_empty());
assert!(storage.get_fork_headers().await.unwrap().is_empty());
assert!(storage.find_children("any").await.unwrap().is_empty());
}
#[tokio::test]
async fn test_common_ancestor_detection() {
let storage = create_test_storage().await;
for i in 0..3 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let h0 = storage
.find_live_header_for_block_hash("hash_0")
.await
.unwrap()
.unwrap();
let h2 = storage
.find_live_header_for_block_hash("hash_2")
.await
.unwrap()
.unwrap();
let ancestor = storage
.find_common_ancestor(&h0, &h2)
.await
.unwrap()
.unwrap();
assert_eq!(ancestor.hash, "hash_0");
}
#[tokio::test]
async fn test_reorg_depth_calculation() {
let storage = create_test_storage().await;
for i in 0..3 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{}", i - 1)
};
let header = create_test_header(i, &prev_hash, &format!("hash_{}", i));
storage.insert_header(header).await.unwrap();
}
let extending = create_test_header(3, "hash_2", "hash_3");
let depth = storage
.find_reorg_depth(&LiveBlockHeader {
previous_hash: "hash_2".to_string(),
..extending
})
.await
.unwrap();
assert_eq!(depth, 0);
}
#[tokio::test]
async fn test_batch_insert_large() {
let storage = create_test_storage().await;
let mut headers = Vec::new();
for i in 0..1000 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("hash_{:06}", i - 1)
};
headers.push(create_test_header(i, &prev_hash, &format!("hash_{:06}", i)));
}
let inserted = storage.insert_headers_batch(&headers).await.unwrap();
assert_eq!(inserted, 1000);
let tip = storage
.update_chain_tip_to_highest()
.await
.unwrap()
.unwrap();
assert_eq!(tip.height, 999);
let h500 = storage.find_header_for_height(500).await.unwrap().unwrap();
assert_eq!(h500.height, 500);
}
#[tokio::test]
async fn test_headers_bytes_multiple() {
let storage = create_test_storage().await;
for i in 0..3 {
let prev_hash = if i == 0 {
"0".repeat(64)
} else {
format!("{:064x}", i - 1)
};
let mut header = create_test_header(i, &prev_hash, &format!("{:064x}", i));
header.merkle_root = format!("{:064x}", i + 100);
storage.insert_header(header).await.unwrap();
}
let bytes = storage.get_headers_bytes(0, 3).await.unwrap();
assert_eq!(bytes.len(), 240); }
#[tokio::test]
async fn test_destroy() {
let storage = create_test_storage().await;
let header = create_test_header(0, &"0".repeat(64), "hash_0");
storage.insert_header(header).await.unwrap();
assert_eq!(storage.header_count().await.unwrap(), 1);
storage.destroy().await.unwrap();
assert_eq!(storage.header_count().await.unwrap(), 0);
}
}