use anyhow::Result;
use std::collections::VecDeque;
use std::sync::Mutex;
use tracing::{debug, info};
use super::blockstore::BlockStore;
use blvm_protocol::segwit::Witness;
use blvm_protocol::{Block, BlockHeader};
struct DeferredBlock {
block: Block,
witnesses: Vec<Vec<Witness>>,
height: u64,
block_hash: [u8; 32],
}
pub struct BufferedBlockStore {
blockstore: BlockStore,
buffer: Mutex<VecDeque<DeferredBlock>>,
flush_threshold: usize,
total_stored: Mutex<u64>,
}
impl BufferedBlockStore {
pub fn new(blockstore: BlockStore, flush_threshold: usize) -> Self {
Self {
blockstore,
buffer: Mutex::new(VecDeque::with_capacity(flush_threshold)),
flush_threshold,
total_stored: Mutex::new(0),
}
}
pub fn store_block_deferred(
&self,
block: &Block,
witnesses: &[Vec<Witness>],
height: u64,
) -> Result<()> {
let block_hash = self.blockstore.get_block_hash(block);
let deferred = DeferredBlock {
block: block.clone(),
witnesses: witnesses.to_vec(),
height,
block_hash,
};
let should_flush = {
let mut buffer = self.buffer.lock().unwrap();
buffer.push_back(deferred);
buffer.len() >= self.flush_threshold
};
if should_flush {
self.flush()?;
}
Ok(())
}
pub fn store_recent_header(&self, height: u64, header: &BlockHeader) -> Result<()> {
self.blockstore.store_recent_header(height, header)
}
pub fn flush(&self) -> Result<()> {
let blocks: Vec<DeferredBlock> = {
let mut buffer = self.buffer.lock().unwrap();
buffer.drain(..).collect()
};
if blocks.is_empty() {
return Ok(());
}
let count = blocks.len();
let start = std::time::Instant::now();
debug!("Flushing {} buffered blocks to database", count);
{
let blocks_tree = self.blockstore.blocks_tree()?;
let mut batch = blocks_tree.batch()?;
for deferred in &blocks {
let block_data = bincode::serialize(&deferred.block)
.map_err(|e| anyhow::anyhow!("Failed to serialize block: {}", e))?;
batch.put(&deferred.block_hash, &block_data);
}
batch.commit()?;
}
{
let witnesses_tree = self.blockstore.witnesses_tree()?;
let mut batch = witnesses_tree.batch()?;
for deferred in &blocks {
if !deferred.witnesses.is_empty() {
let witness_data = bincode::serialize(&deferred.witnesses)
.map_err(|e| anyhow::anyhow!("Failed to serialize witnesses: {}", e))?;
batch.put(&deferred.block_hash, &witness_data);
}
}
batch.commit()?;
}
{
let height_tree = self.blockstore.height_tree()?;
let mut batch = height_tree.batch()?;
for deferred in &blocks {
let height_key = deferred.height.to_be_bytes();
batch.put(&height_key, &deferred.block_hash);
}
batch.commit()?;
}
*self.total_stored.lock().unwrap() += count as u64;
let elapsed = start.elapsed();
info!(
"Flushed {} blocks in {:?} ({:.0} blocks/sec)",
count,
elapsed,
count as f64 / elapsed.as_secs_f64()
);
Ok(())
}
pub fn buffered_count(&self) -> usize {
self.buffer.lock().unwrap().len()
}
pub fn total_stored(&self) -> u64 {
*self.total_stored.lock().unwrap() + self.buffered_count() as u64
}
pub fn inner(&self) -> &BlockStore {
&self.blockstore
}
}
impl Drop for BufferedBlockStore {
fn drop(&mut self) {
if let Err(e) = self.flush() {
tracing::warn!("Failed to flush buffered blocks on shutdown: {}", e);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::database::{create_database, default_backend, Database};
use std::sync::Arc;
use tempfile::TempDir;
#[test]
fn test_buffered_store_creation() {
let temp_dir = TempDir::new().unwrap();
let db: Arc<dyn Database> =
Arc::from(create_database(temp_dir.path(), default_backend(), None).unwrap());
let blockstore = BlockStore::new(db).unwrap();
let buffer = BufferedBlockStore::new(blockstore, 1000);
assert_eq!(buffer.inner().block_count().unwrap(), 0);
}
}