1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result, anyhow, bail};
use blockstore::{Store, StoreOptions};
// fjall 3 renamed its concepts: the old `Keyspace` (the whole store) is now a
// `Database`, and the old `PartitionHandle` (a column family) is now a
// `Keyspace`. So `Inner::db` is the store and the `Keyspace` fields are what
// used to be partitions.
use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode};
use tokio::sync::RwLock;
use tracing::debug;
/// Shared storage handle. Cheap to clone (Arcs inside).
#[derive(Clone, Debug)]
pub struct Storage {
inner: Arc<Inner>,
}
struct Inner {
bs_dir: std::path::PathBuf,
/// `RwLock` (not `Mutex`) so block reads run concurrently. The blockstore
/// reads via positional `read_at` (no shared file cursor) and its only
/// interior mutability is atomics + a `parking_lot::Mutex`, so it is `Sync`
/// and many readers can share `&Store` safely. Only the rare lazy-open and
/// writes (`put`) take the exclusive write lock; the hot read path takes a
/// shared read lock and no longer serializes on a single mutex.
store: RwLock<Option<Store>>,
db: Database,
hash_to_height: Keyspace,
/// `tx_hash (32) → height (u64 LE) ++ index (u32 LE)` (12 bytes).
/// Populated on ingest; powers `eth_getTransactionByHash`.
tx_to_block: Keyspace,
/// When the blockstore is created fresh, anchor its `minimum_height`
/// here instead of at the first block written. Set in `--mirror-from`
/// mode to the upstream's earliest retained height so backfill can
/// reproduce the whole upstream range rather than only forward from the
/// tip. `None` keeps the original "anchor at first ingest" behavior.
/// Ignored when the store already exists (its floor is already baked in).
anchor_floor: Option<u64>,
}
impl std::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inner")
.field("bs_dir", &self.bs_dir)
.finish_non_exhaustive()
}
}
impl Storage {
/// Open (or create) the storage at `data_dir`. The upstream-reported
/// chain ID (queried via `eth_chainId` at startup, then passed in
/// decimal here) is stamped into a `meta` fjall keyspace on first open
/// and verified on every subsequent open; a mismatch returns an error
/// rather than silently mixing data. Anchoring on chain ID rather than
/// a user-supplied label means `--rpc-url` overrides are caught too.
pub fn open(data_dir: &Path, chain_id: u64, anchor_floor: Option<u64>) -> Result<Self> {
let bs_dir = data_dir.join("blocks");
let idx_dir = data_dir.join("index");
std::fs::create_dir_all(&bs_dir)?;
let db = Database::builder(&idx_dir).open()?;
let hash_to_height = db.keyspace("hash_to_height", KeyspaceCreateOptions::default)?;
debug!(
approx_len = hash_to_height.approximate_len(),
"opened keyspace hash_to_height",
);
let tx_to_block = db.keyspace("tx_to_block", KeyspaceCreateOptions::default)?;
debug!(
approx_len = tx_to_block.approximate_len(),
"opened keyspace tx_to_block",
);
// Chain-ID stamp lives in its own `meta` keyspace. We only need it
// at open time, so the handle is scoped to this block (not held in
// `Inner`).
{
let meta = db.keyspace("meta", KeyspaceCreateOptions::default)?;
let chain_id_str = chain_id.to_string();
if let Some(slice) = meta.get("chain_id")? {
let stored = std::str::from_utf8(slice.as_ref())
.context("meta/chain_id is not valid UTF-8")?;
if stored != chain_id_str {
bail!(
"data dir {} is stamped for chain_id {}, refusing to open with chain_id {}",
data_dir.display(),
stored,
chain_id_str,
);
}
debug!(chain_id = stored, "chain_id stamp verified");
} else {
meta.insert("chain_id", chain_id_str.as_str())?;
db.persist(PersistMode::Buffer)?;
debug!(chain_id = %chain_id_str, "chain_id stamp written");
}
}
let store = if bs_dir.join("blockdb.idx").exists() {
let s = Store::open(&bs_dir, &bs_dir, StoreOptions::default())
.context("opening blockstore")?;
debug!(
min_height = s.min_block_height(),
max_contiguous = s.max_contiguous_height(),
high_water = s.height_highwater(),
"opened blockstore",
);
Some(s)
} else {
debug!("blockstore not yet created (no blocks ingested)");
None
};
Ok(Self {
inner: Arc::new(Inner {
bs_dir,
store: RwLock::new(store),
db,
hash_to_height,
tx_to_block,
anchor_floor,
}),
})
}
/// Highest stored height (0 if nothing yet). Uses blockstore's
/// `height_highwater` so gaps in the stored range (from WS reconnect or
/// restart) don't pin the reported tip to the floor of the first gap.
pub async fn high_water(&self) -> u64 {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || {
let guard = inner.store.blocking_read();
guard.as_ref().map_or(0, Store::height_highwater)
})
.await
.unwrap_or(0)
}
/// Lowest stored block height (0 if the store hasn't been opened yet —
/// nothing has been ingested). Anchored on first ingest to whatever
/// height newHeads first delivered.
pub async fn min_height(&self) -> u64 {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || {
let guard = inner.store.blocking_read();
guard.as_ref().map_or(0, Store::min_block_height)
})
.await
.unwrap_or(0)
}
/// On-disk directory holding the blockstore files (block bytes + `.idx`).
pub fn blockdb_dir(&self) -> &Path {
&self.inner.bs_dir
}
/// Highest height H such that every block in `[min_block_height, H]` is
/// present. Drives the backfill worker — `H + 1` is the next hole.
pub async fn max_contiguous_height(&self) -> u64 {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || {
let guard = inner.store.blocking_read();
guard.as_ref().map_or(0, Store::max_contiguous_height)
})
.await
.unwrap_or(0)
}
/// Read a block's stored bytes by height. Out-of-range heights (below the
/// blockstore's `min_height` or above our high-water mark) return `None`
/// rather than an error — this is the "we don't have it" signal that
/// drives the 421 response in the HTTP layer.
pub async fn get_by_height(&self, height: u64) -> Result<Option<Vec<u8>>> {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
let guard = inner.store.blocking_read();
let Some(store) = guard.as_ref() else {
debug!(height, "block not present: store not opened yet");
return Ok(None);
};
if height < store.min_block_height() || height > store.height_highwater() {
debug!(
height,
min = store.min_block_height(),
high_water = store.height_highwater(),
"block not present: out of range",
);
return Ok(None);
}
if let Some(arc) = store.read_block(height)? {
debug!(height, bytes = arc.as_ref().len(), "read block by height");
Ok(Some(arc.as_ref().to_vec()))
} else {
debug!(height, "block not present: gap in stored range");
Ok(None)
}
})
.await?
}
/// Read a block's stored bytes by 32-byte hash.
pub async fn get_by_hash(&self, hash: [u8; 32]) -> Result<Option<Vec<u8>>> {
let Some(slice) = self.inner.hash_to_height.get(hash)? else {
debug!(hash = %hex::encode(hash), "hash_to_height miss");
return Ok(None);
};
let bytes: [u8; 8] = slice
.as_ref()
.try_into()
.map_err(|_| anyhow!("bad height entry in index"))?;
let height = u64::from_le_bytes(bytes);
debug!(hash = %hex::encode(hash), height, "hash_to_height hit");
self.get_by_height(height).await
}
/// Look up where a transaction lives: `(height, tx_index)` if we've
/// indexed it during ingest, `None` otherwise.
pub fn get_tx_location(&self, tx_hash: [u8; 32]) -> Result<Option<(u64, u32)>> {
let Some(slice) = self.inner.tx_to_block.get(tx_hash)? else {
debug!(tx_hash = %hex::encode(tx_hash), "tx_to_block miss");
return Ok(None);
};
let bytes: [u8; 12] = slice
.as_ref()
.try_into()
.map_err(|_| anyhow!("bad tx_to_block entry"))?;
let height = u64::from_le_bytes(bytes[0..8].try_into().expect("8 bytes"));
let idx = u32::from_le_bytes(bytes[8..12].try_into().expect("4 bytes"));
debug!(tx_hash = %hex::encode(tx_hash), height, idx, "tx_to_block hit");
Ok(Some((height, idx)))
}
/// Insert a block at the given height and update both indexes
/// (`hash → height` and `tx_hash → (height, idx)`). Lazily opens the
/// blockstore on the very first call so its `minimum_height` can be
/// anchored — at the configured `anchor_floor` (mirror mode) when set and
/// `<= height`, otherwise at `height` itself.
///
/// # Write ordering and partial-failure behavior
///
/// The writes happen in two stages:
///
/// 1. Blockstore `write_block` (height → bytes), then
/// 2. A single atomic fjall `Batch` covering all index writes
/// (`hash_to_height` + each `tx_to_block` entry).
///
/// The fjall batch is all-or-nothing — within the batch there is no
/// "some tx indexes written, some not" state. The remaining failure
/// window is a crash between stage 1 and stage 2: the blockstore has
/// the block but no fjall index points at it. Symptom: lookups by
/// hash / tx for that one block return 421; `eth_getBlockBy
/// Number(<height>)` and `eth_blockNumber` still succeed. The
/// blockstore's `max_contiguous_height` will have advanced past this
/// height, so the backfill worker won't refill the indexes
/// automatically — accept this as a known mild-corruption mode for
/// the prototype, fixable later by writing the fjall batch first or
/// by moving block bytes into fjall.
pub async fn put(
&self,
height: u64,
hash: [u8; 32],
tx_hashes: &[[u8; 32]],
block_bytes: Vec<u8>,
) -> Result<()> {
let inner = Arc::clone(&self.inner);
let bs_dir = inner.bs_dir.clone();
tokio::task::spawn_blocking(move || -> Result<()> {
let mut guard = inner.store.blocking_write();
if guard.is_none() {
// Anchor at the configured floor when set (mirror mode), so the
// store can hold the whole upstream range; otherwise anchor at
// this first block. Clamp to `height` so we never set a floor
// above the block we're about to write (blockstore requires
// minimum_height <= every stored height).
let minimum_height = inner
.anchor_floor
.filter(|&f| f <= height)
.unwrap_or(height);
let opts = StoreOptions {
truncate: true,
minimum_height,
..StoreOptions::default()
};
let s = Store::open(&bs_dir, &bs_dir, opts).context("opening blockstore")?;
*guard = Some(s);
}
guard
.as_ref()
.expect("store initialized above")
.write_block(height, &block_bytes)?;
Ok(())
})
.await??;
let mut batch = self.inner.db.batch();
debug!(
height,
hash = %hex::encode(hash),
"indexed hash_to_height",
);
batch.insert(&self.inner.hash_to_height, hash, height.to_le_bytes());
for (idx, tx_hash) in tx_hashes.iter().enumerate() {
let mut value = [0u8; 12];
value[0..8].copy_from_slice(&height.to_le_bytes());
value[8..12].copy_from_slice(&(idx as u32).to_le_bytes());
debug!(
height,
idx,
tx_hash = %hex::encode(tx_hash),
"indexed tx_to_block",
);
batch.insert(&self.inner.tx_to_block, *tx_hash, value);
}
batch.commit()?;
// The batch's default durability is PersistMode::Buffer (no per-write
// fsync); the journal tail lives in the page cache until `persist` is
// called on shutdown. No explicit persist needed here.
Ok(())
}
/// Flush durably to disk. Steady-state writes use `PersistMode::Buffer`
/// (no fsync), so the journal tail lives in the OS page cache — fine for a
/// graceful process exit, lost on power failure. Call this on shutdown to
/// `fsync` the journal. The blockstore separately checkpoints in its own
/// `Drop` when the runtime tears the tasks down.
pub async fn persist(&self) -> Result<()> {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || -> Result<()> {
inner.db.persist(PersistMode::SyncAll)?;
Ok(())
})
.await?
}
}