# Protoblock
[](https://github.com/ouziel-slama/protoblock/actions/workflows/tests.yml)
[](https://codecov.io/github/ouziel-slama/protoblock)
[](https://github.com/ouziel-slama/protoblock/actions/workflows/fmt.yml)
[](https://github.com/ouziel-slama/protoblock/actions/workflows/clippy.yml)
[](https://crates.io/crates/protoblock)
[](https://docs.rs/protoblock)
Protoblock is an asynchronous Bitcoin block ingestion pipeline. It multiplexes JSON-RPC batch calls across a pool of workers, pre-processes blocks with user-defined logic, and feeds an ordered queue that guarantees strictly increasing heights before executing stateful `BlockProtocol` hooks. The crate focuses on correctness under reorgs, graceful shutdown, and observability so downstream consumers can build reliable backfills or live processors on top of it.
## Installation
1. **Requirements**
- Rust 1.79+ with the `tokio` multi-threaded runtime available.
- Access to a Bitcoin Core node with RPC enabled (`getblockhash` and `getblock` permissions).
- `bitcoind` should have `-txindex=1` if you plan to access historical blocks arbitrarily.
2. **Add the dependency**
- When the crate is published on crates.io:
```toml
protoblock = "0.1"
```
- While developing in-tree, point to the local path or Git repository:
```toml
protoblock = { path = "../protoblock" }
# or
protoblock = { git = "https://github.com/your-org/protoblock", rev = "<commit>" }
```
3. **Build & test**
```bash
cargo check
cargo test --all # unit + mock integration tests
PROTOBLOCK_RUN_REGTESTS=1 cargo test regtest # opt-in real bitcoind tests
```
The regtest suite requires a local `bitcoind` binary on your `PATH`. Leave `PROTOBLOCK_RUN_REGTESTS` unset (the default) to skip those heavier tests.
4. **Optional tooling**
- Enable tracing by setting `RUST_LOG=info` (or a more specific target) before running your binary.
## Quick Start
Below is a minimal example that prints block metadata while keeping the heavy lifting inside Protoblock.
```rust,no_run
use anyhow::Result;
use bitcoin::Block;
use protoblock::{
BlockProtocol, FetcherConfig, ProtocolFuture, ProtocolPreProcessFuture, Runner,
};
struct LoggingProtocol;
impl BlockProtocol for LoggingProtocol {
type PreProcessed = Block;
fn pre_process(
&self,
block: Block,
height: u64,
) -> ProtocolPreProcessFuture<Self::PreProcessed> {
// This async hook runs in parallel across workers; its Ok result becomes
// the `process` input once the ordered queue releases this height.
Box::pin(async move {
tracing::info!(height, txs = block.txdata.len(), "downloaded block");
Ok(block)
})
}
fn process<'a>(
&'a mut self,
block: Self::PreProcessed,
height: u64,
) -> ProtocolFuture<'a> {
Box::pin(async move {
// Apply state changes serially (write to DB, forward to Kafka, etc.)
tracing::info!(height, hash = %block.block_hash(), "committing block");
Ok(())
})
}
fn rollback<'a>(&'a mut self, keep_height: u64) -> ProtocolFuture<'a> {
Box::pin(async move {
tracing::warn!(
keep_height,
"chain reorg rollback requested; purge heights above this value"
);
Ok(())
})
}
fn shutdown<'a>(&'a mut self) -> ProtocolFuture<'a> {
Box::pin(async move {
tracing::info!("protocol shutdown hook invoked");
Ok(())
})
}
}
#[tokio::main]
async fn main() -> Result<()> {
protoblock::init_tracing();
let config = FetcherConfig::builder()
.rpc_url("http://127.0.0.1:8332")
.rpc_user("rpcuser")
.rpc_password("rpcpass")
.thread_count(4)
.max_batch_size_mb(4)
.reorg_window_size(12)
.queue_max_size_mb(4_096)
.start_height(0)
.build()?;
let protocol = LoggingProtocol;
let mut runner = Runner::new(config, protocol);
runner.run_until_ctrl_c().await?;
Ok(())
}
```
Run the example binary, mine a few blocks on your regtest node, and observe Protoblock streaming them in order with backpressure and retry handling baked in.
## Examples
`examples/rpc_speed.rs` spins up a no-op protocol so you can measure raw JSON-RPC throughput and worker scheduling overhead. It renders a live progress bar (via `indicatif`) whose length follows the current blockchain tip, and it runs until you send Ctrl-C.
```bash
export PROTOBLOCK_RPC_URL="http://127.0.0.1:8332" # optional (default http://localhost:8332)
export PROTOBLOCK_RPC_USER="rpcuser" # optional (default rpc)
export PROTOBLOCK_RPC_PASSWORD="rpcpass" # optional (default rpc)
export PROTOBLOCK_START_HEIGHT=830000 # optional (default 0)
export PROTOBLOCK_THREAD_COUNT=8 # optional (default 4)
export PROTOBLOCK_MAX_BATCH_MB=8 # optional (default 4)
export PROTOBLOCK_QUEUE_MB=8192 # optional (default 200)
export PROTOBLOCK_REORG_WINDOW=24 # optional (default 12)
export PROTOBLOCK_MAX_REQUEST_MB=20 # optional (default 20)
export PROTOBLOCK_MAX_RESPONSE_MB=20 # optional (default 20)
cargo run --example rpc_speed
```
Adjust the optional knobs to reflect your node’s capabilities. The processor and pre-processor intentionally discard every block so the run isolates RPC performance, and the example will keep streaming until you terminate it.
## Architecture Overview
- **Runner** wires graceful shutdown, signal handling, and owns the root `CancellationToken`.
- **BlocksFetcher** owns the worker pool, ordered queue, progress tracking, telemetry, and the main processing loop that applies `BlockProtocol::process`.
- **Workers** fetch batches of blocks concurrently, call the asynchronous `pre_process` hook, and enqueue `PreProcessedBlock` items. Each worker now maintains a double-buffer: it launches the next `batch_get_blocks` call as soon as a batch is in hand, lets that RPC run while `pre_process` executes, and aborts the prefetched future if shutdown/stop/generation changes occur. Batch sizes still grow/shrink automatically based on the observed megabytes per batch.
- **RPC Module (`src/rpc/`)** splits authentication, options, retry instrumentation, metrics, helpers, and the `AsyncRpcClient` into focused submodules while re-exporting the same public surface (`crate::rpc::*`). This keeps transport code isolated, mockable via the `BlockBatchClient` trait, and ready for future transports.
- **OrderedBlockQueue** releases blocks strictly in-order and enforces a configurable byte budget so workers apply backpressure instead of growing memory unbounded, ensuring the sequential `process` stage never observes a height gap even though workers fetch out-of-order.
- **QueueByteSize** is a lightweight trait (re-exported from `protoblock::QueueByteSize`) implemented by every protocol’s `PreProcessed` type. Workers call `PreProcessedBlock::queue_bytes()` so telemetry, `BatchSizer`, and queue backpressure track the *post*-`pre_process` footprint instead of the raw RPC payload size.
- **ReorgWindow** tracks recent hashes so the processing loop can detect when a new block contradicts the stored chain, invoke `rollback`, and re-seed the queue safely. The loop passes the fork height (the last height that must remain), so `rollback` implementations must purge any state for heights greater than the provided value.
- **RpcCircuitBreaker** wraps the JSON-RPC client to avoid hammering a failing node. When the breaker is Open, workers pause and allow the node to recover before resuming via Half-Open probes.
- **Telemetry & Metrics Reporter** expose throughput, queue blocks/bytes, and RPC error rates through tracing logs at a configurable interval via `runtime::telemetry::Telemetry`.
- **runtime::fatal::FatalErrorHandler** and `runtime::progress::ProgressTracker` guarantee that fatal protocol errors stop the pipeline, bubble up through `BlocksFetcher::stop`/`Runner::run_until_ctrl_c`, and leave restart decisions to the operator-provided `start_height` (no persistence is performed for you). The tracker reports `start_height - 1` immediately so hosts can observe which heights are already durable after a restart.
- **Shutdown Semantics** intentionally favor quick termination: when the shared cancellation token flips, workers stop fetching immediately and any blocks left in `OrderedBlockQueue` are dropped instead of drained.
- **Tip wait mode** automatically reduces the worker pool to a single task when the pipeline catches up to the current Bitcoin tip. This is acceptable because Bitcoin produces roughly one block every ~10 minutes—there is no realistic scenario where dozens of new blocks appear instantly—so a single worker can comfortably keep pace until a restart re-expands the pool. Control how aggressively that idle worker polls for the next block via `FetcherConfig::tip_idle_backoff`, and independently tune the background tip refresh cadence via `FetcherConfig::tip_refresh_interval`.
- **WorkerPool / TipTracker / ReorgManager / LifecycleHandles** split responsibilities that used to sit inside `BlocksFetcher`. The worker pool manages concurrency, `TipTracker` decides when to downshift near tip, `ReorgManager` handles RPC lookups and rollbacks, and `LifecycleHandles` bundles per-run resources (fatal handler, child cancellation token, metrics reporter, tip refresher). This keeps each module focused and easier to extend.
Refer to `docs/architecture.md` for a deep dive into each component and how they interact.
## Configuration
`FetcherConfig` centralizes every runtime parameter. Build it via `FetcherConfig::builder()` (or call `FetcherConfig::new(FetcherConfigParams { .. })` if you already have concrete values) to benefit from validation and sensible defaults. The struct’s fields are intentionally private so every instance is validated before it is used.
| Field | Description | Notes |
| --- | --- | --- |
| `rpc_url` | Full HTTP URL of the Bitcoin Core JSON-RPC endpoint. | Must start with `http://` or `https://`. |
| `rpc_user` / `rpc_password` | RPC credentials. | Required; empty strings are rejected. |
| `thread_count` | Number of worker tasks fetching blocks. | Must be ≥ 1; also influences stride between heights. |
| `max_batch_size_mb` | Target megabytes per batch before workers increase/decrease the block count. | Controls throughput vs. memory pressure. |
| `reorg_window_size` | Size of the sliding window (in blocks) used to detect and roll back chain reorganizations. | Choose ≥ the maximum depth you want to tolerate. |
| `queue_max_size_mb` | Maximum megabytes of queued `PreProcessedBlock` data allowed in the ordered queue. | Defaults to 200 MB; workers block when the byte budget is exhausted (backpressure). |
| `start_height` | First height to request when the pipeline starts. | Required; Protoblock always restarts from this height. |
| `rpc_timeout` | Per-RPC timeout duration. | Defaults to 10 seconds if unset. |
| `metrics_interval` | Interval between telemetry snapshots logged to `protoblock::metrics`. | Defaults to 5 seconds; tune to align with your logging/observability cadence. |
| `tip_idle_backoff` | How long workers wait before retrying when they reach the tip or receive `HeightOutOfRange`. | Defaults to 10 seconds; lower values poll for new blocks more aggressively at the cost of extra RPC chatter. |
| `tip_refresh_interval` | Frequency of background `getblockcount` polling performed by the tip tracker. | Defaults to 10 seconds; lower values update the shared tip cache faster without affecting worker idle backoff. |
| `rpc_max_request_body_bytes` | Maximum JSON-RPC HTTP request body size permitted by the client. | Defaults to 10 MB (jsonrpsee default); set this to match your node/frontend proxy limits. |
| `rpc_max_response_body_bytes` | Maximum JSON-RPC HTTP response body size permitted by the client. | Defaults to 10 MB; workers proactively split batches so responses remain under this ceiling. |
Tips:
- Replace any `FetcherConfig { .. }` literals with `FetcherConfig::builder()` or `FetcherConfig::new(FetcherConfigParams { .. })`; direct struct construction is no longer supported.
- Reuse `Runner::cancellation_token()` if your host application already has a shutdown signal source.
- Monitor the tracing target `protoblock::metrics` to observe queue blocks, queue bytes, throughput, and RPC failures in real time.
#### Upgrading existing configs
If you previously instantiated `FetcherConfig` via a struct literal, switch to the builder pattern or `FetcherConfig::new(FetcherConfigParams { .. })`. Both paths perform validation before returning, eliminating the window where an invalid configuration could reach the fetcher.
## Documentation Set
- `docs/architecture.md` — details the internal pipeline and concurrency model.
- `docs/usage.md` — walks through configuration, protocol implementation, and recommended operational practices.
Both documents complement this README and complete the Phase 7 documentation checklist.
## License
Protoblock is dual-licensed under the MIT License (`LICENSE-MIT`) or the Apache License, Version 2.0 (`LICENSE-APACHE`), matching the approach used in `rollblock`. You may choose either license to use, copy, modify, and distribute this project.