tycho_core/storage/
tables.rs

1use bytesize::ByteSize;
2use tycho_storage::kv::{
3    DEFAULT_MIN_BLOB_SIZE, TableContext, default_block_based_table_factory,
4    optimize_for_level_compaction, optimize_for_point_lookup, refcount, with_blob_db,
5};
6use weedb::rocksdb::{
7    self, BlockBasedIndexType, BlockBasedOptions, CompactionPri, DBCompressionType,
8    DataBlockIndexType, MemtableFactory, MergeOperands, Options, ReadOptions, SliceTransform,
9};
10use weedb::{ColumnFamily, ColumnFamilyOptions};
11
12/// Stores generic node parameters
13/// - Key: `...`
14/// - Value: `...`
15pub struct State;
16
17impl ColumnFamily for State {
18    const NAME: &'static str = "state";
19}
20
21impl ColumnFamilyOptions<TableContext> for State {
22    fn options(opts: &mut Options, ctx: &mut TableContext) {
23        default_block_based_table_factory(opts, ctx);
24
25        opts.set_optimize_filters_for_hits(true);
26        optimize_for_point_lookup(opts, ctx);
27    }
28}
29
30/// Stores prepared archives
31/// - Key: `u32 (BE)` (archive id)
32/// - Value: `Vec<u8>` (archive block ids)
33pub struct ArchiveBlockIds;
34
35impl ColumnFamily for ArchiveBlockIds {
36    const NAME: &'static str = "archive_block_ids";
37}
38
39impl ColumnFamilyOptions<TableContext> for ArchiveBlockIds {
40    fn options(opts: &mut Options, ctx: &mut TableContext) {
41        default_block_based_table_factory(opts, ctx);
42
43        // Uses 128MB * 6 = 768 GB
44        optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
45
46        opts.set_merge_operator_associative("archive_data_merge", archive_data_merge);
47        // data is hardly compressible and dataset is small
48        opts.set_compression_type(DBCompressionType::None);
49        with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
50    }
51}
52
53/// Stores split archives
54/// - Key: `u32 (BE)` (archive id) + `u64 (BE)` (chunk index)
55/// - Value: `Vec<u8>` (archive data chunk)
56pub struct Archives;
57
58impl Archives {
59    pub const KEY_LEN: usize = 4 + 8;
60}
61
62impl ColumnFamily for Archives {
63    const NAME: &'static str = "archives";
64}
65
66impl ColumnFamilyOptions<TableContext> for Archives {
67    fn options(opts: &mut Options, ctx: &mut TableContext) {
68        default_block_based_table_factory(opts, ctx);
69
70        // Uses 512 * 8 = 4 GB
71        optimize_for_level_compaction(opts, ctx, ByteSize::mib(512), 8);
72
73        // data is already compressed
74        opts.set_compression_type(DBCompressionType::None);
75        with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
76    }
77}
78
79/// Maps block root hash to block meta
80/// - Key: `[u8; 32]`
81/// - Value: `BlockMeta`
82pub struct BlockHandles;
83
84impl ColumnFamily for BlockHandles {
85    const NAME: &'static str = "block_handles";
86
87    fn read_options(opts: &mut ReadOptions) {
88        opts.set_verify_checksums(false);
89    }
90}
91
92impl ColumnFamilyOptions<TableContext> for BlockHandles {
93    fn options(opts: &mut Options, ctx: &mut TableContext) {
94        // Uses 128MB * 6 = 768 GB
95        optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
96
97        opts.set_merge_operator_associative("block_handle_merge", block_handle_merge);
98        optimize_for_point_lookup(opts, ctx);
99    }
100}
101
102/// Maps seqno to key block id
103/// - Key: `u32 (BE)`
104/// - Value: `BlockIdExt`
105pub struct KeyBlocks;
106
107impl ColumnFamily for KeyBlocks {
108    const NAME: &'static str = "key_blocks";
109
110    fn read_options(opts: &mut ReadOptions) {
111        opts.set_verify_checksums(false);
112    }
113}
114
115impl ColumnFamilyOptions<TableContext> for KeyBlocks {}
116
117/// Maps block id (partial) to file hash
118pub struct FullBlockIds;
119
120impl ColumnFamily for FullBlockIds {
121    const NAME: &'static str = "full_block_ids";
122
123    fn read_options(opts: &mut rocksdb::ReadOptions) {
124        opts.set_verify_checksums(false);
125    }
126}
127
128impl ColumnFamilyOptions<TableContext> for FullBlockIds {
129    fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
130        default_block_based_table_factory(opts, ctx);
131    }
132}
133
134/// Maps package entry id to entry data
135/// - Key: `BlockIdShort (16 bytes), [u8; 32], package type (1 byte)` <=> (`PackageEntryKey`)
136/// - Value: `Vec<u8>` (block/proof/queue diff data)
137pub struct PackageEntries;
138
139impl PackageEntries {
140    pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 1;
141}
142
143impl ColumnFamily for PackageEntries {
144    const NAME: &'static str = "package_entries";
145}
146
147impl ColumnFamilyOptions<TableContext> for PackageEntries {
148    fn options(opts: &mut Options, ctx: &mut TableContext) {
149        default_block_based_table_factory(opts, ctx);
150        opts.set_compression_type(DBCompressionType::Zstd);
151
152        // Uses 8 * 512MB = 4GB
153        let buffer_size = ByteSize::mib(512);
154        let buffers_to_merge = 2;
155        let buffer_count = 8;
156        opts.set_write_buffer_size(buffer_size.as_u64() as _);
157        opts.set_max_write_buffer_number(buffer_count);
158        opts.set_min_write_buffer_number_to_merge(buffers_to_merge); // allow early flush
159        ctx.track_buffer_usage(
160            ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
161            ByteSize(buffer_size.as_u64() * buffer_count as u64),
162        );
163
164        with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::Zstd);
165
166        // This flag specifies that the implementation should optimize the filters
167        // mainly for cases where keys are found rather than also optimize for keys
168        // missed. This would be used in cases where the application knows that
169        // there are very few misses or the performance in the case of misses is not
170        // important.
171        //
172        // For now, this flag allows us to not store filters for the last level i.e
173        // the largest level which contains data of the LSM store. For keys which
174        // are hits, the filters in this level are not useful because we will search
175        // for the data anyway. NOTE: the filters in other levels are still useful
176        // even for key hit because they tell us whether to look in that level or go
177        // to the higher level.
178        // https://github.com/facebook/rocksdb/blob/81aeb15988e43c49952c795e32e5c8b224793589/include/rocksdb/advanced_options.h#L846
179        opts.set_optimize_filters_for_hits(true);
180    }
181}
182
183/// Maps block id to compressed block data
184/// - Key: `BlockIdShort (16 bytes), [u8; 32], chunk index (4 byte)`
185/// - Value: `Vec<u8>`
186pub struct BlockDataEntries;
187
188impl BlockDataEntries {
189    pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 4;
190}
191
192impl ColumnFamily for BlockDataEntries {
193    const NAME: &'static str = "block_data_entries";
194}
195
196impl ColumnFamilyOptions<TableContext> for BlockDataEntries {
197    fn options(opts: &mut Options, ctx: &mut TableContext) {
198        default_block_based_table_factory(opts, ctx);
199
200        // Uses 128MB * 6 = 768 GB
201        optimize_for_level_compaction(opts, ctx, ByteSize::mib(127), 6);
202
203        // data is already compressed
204        opts.set_compression_type(DBCompressionType::None);
205        with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
206    }
207}
208
209/// Maps `BlockId` to root cell hash
210/// - Key: `BlockId`
211/// - Value: `[u8; 32]`
212pub struct ShardStates;
213
214impl ColumnFamily for ShardStates {
215    const NAME: &'static str = "shard_states";
216}
217
218impl ColumnFamilyOptions<TableContext> for ShardStates {
219    fn options(opts: &mut Options, ctx: &mut TableContext) {
220        default_block_based_table_factory(opts, ctx);
221        opts.set_compression_type(DBCompressionType::Zstd);
222    }
223}
224
225/// Stores cells data
226/// - Key: `[u8; 32]` (cell repr hash)
227/// - Value: `StorageCell`
228pub struct Cells;
229
230impl ColumnFamily for Cells {
231    const NAME: &'static str = "cells";
232}
233
234impl ColumnFamilyOptions<TableContext> for Cells {
235    fn options(opts: &mut Options, ctx: &mut TableContext) {
236        opts.set_level_compaction_dynamic_level_bytes(true);
237
238        opts.set_merge_operator_associative("cell_merge", refcount::merge_operator);
239        opts.set_compaction_filter("cell_compaction", refcount::compaction_filter);
240
241        // optimize for bulk inserts and single writer
242
243        // Uses 8 * 512MB = 4GB
244        let buffer_size = ByteSize::mib(512);
245        let buffers_to_merge = 2;
246        let buffer_count = 8;
247        opts.set_write_buffer_size(buffer_size.as_u64() as _);
248        opts.set_max_write_buffer_number(buffer_count);
249        opts.set_min_write_buffer_number_to_merge(buffers_to_merge); // allow early flush
250        ctx.track_buffer_usage(
251            ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
252            ByteSize(buffer_size.as_u64() * buffer_count as u64),
253        );
254
255        opts.set_max_successive_merges(0); // it will eat cpu, we are doing first merge in hashmap anyway.
256
257        // - Write batch size: 500K entries
258        // - Entry size: ~244 bytes (32 SHA + 8 seq + 192 value + 12 overhead)
259        // - Memtable size: 512MB
260
261        // 1. Entries per memtable = 512MB / 244B ≈ 2.2M entries
262        // 2. Target bucket load factor = 10-12 entries per bucket (RocksDB recommendation)
263        // 3. Bucket count = entries / target_load = 2.2M / 11 ≈ 200K
264        opts.set_memtable_factory(MemtableFactory::HashLinkList {
265            bucket_count: 200_000,
266        });
267
268        opts.set_memtable_prefix_bloom_ratio(0.1); // we use hash-based memtable so bloom filter is not that useful
269        opts.set_bloom_locality(1); // Optimize bloom filter locality
270
271        let mut block_factory = BlockBasedOptions::default();
272
273        // todo: some how make block cache separate for cells,
274        // using 3/4 of all available cache space
275        block_factory.set_block_cache(&ctx.caches().block_cache);
276
277        // 10 bits per key, stored at the end of the sst
278        block_factory.set_bloom_filter(10.0, false);
279        block_factory.set_optimize_filters_for_memory(true);
280        block_factory.set_whole_key_filtering(true);
281
282        // to match fs block size
283        block_factory.set_block_size(4096);
284        block_factory.set_format_version(6);
285
286        // we have 4096 / 256 = 16 keys per block, so binary search is enough
287        block_factory.set_data_block_index_type(DataBlockIndexType::BinarySearch);
288
289        block_factory.set_index_type(BlockBasedIndexType::HashSearch);
290        block_factory.set_pin_l0_filter_and_index_blocks_in_cache(true);
291
292        opts.set_block_based_table_factory(&block_factory);
293        opts.set_prefix_extractor(SliceTransform::create_noop());
294
295        opts.set_memtable_whole_key_filtering(true);
296        opts.set_memtable_prefix_bloom_ratio(0.25);
297
298        opts.set_compression_type(DBCompressionType::None);
299
300        opts.set_compaction_pri(CompactionPri::OldestSmallestSeqFirst);
301        opts.set_level_zero_file_num_compaction_trigger(8);
302
303        opts.set_target_file_size_base(512 * 1024 * 1024); // smaller files for more efficient GC
304
305        opts.set_max_bytes_for_level_base(4 * 1024 * 1024 * 1024); // 4GB per level
306        opts.set_max_bytes_for_level_multiplier(8.0);
307
308        // 512MB per file; less files - less compactions
309        opts.set_target_file_size_base(512 * 1024 * 1024);
310        // L1: 4GB
311        // L2: ~32GB
312        // L3: ~256GB
313        // L4: ~2TB
314        opts.set_num_levels(5);
315
316        opts.set_optimize_filters_for_hits(true);
317
318        // we have our own cache and don't want `kcompactd` goes brrr scenario
319        opts.set_use_direct_reads(true);
320        opts.set_use_direct_io_for_flush_and_compaction(true);
321
322        opts.add_compact_on_deletion_collector_factory(
323            100, // N: examine 100 consecutive entries
324            // Small enough window to detect local delete patterns
325            // Large enough to avoid spurious compactions
326            45, // D: trigger on 45 deletions in window
327            // Balance between the space reclaim and compaction frequency
328            // ~45% deletion density trigger
329            0.5, /* deletion_ratio: trigger if 50% of a total file is deleted
330                  * Backup trigger for overall file health
331                  * Higher than window trigger to prefer local optimization */
332        );
333
334        // single writer optimizations
335        opts.set_enable_write_thread_adaptive_yield(false);
336        opts.set_allow_concurrent_memtable_write(false);
337        opts.set_enable_pipelined_write(true);
338        opts.set_inplace_update_support(false);
339        opts.set_unordered_write(true); // we don't use snapshots
340        opts.set_avoid_unnecessary_blocking_io(true); // schedule unnecessary IO in background;
341
342        opts.set_auto_tuned_ratelimiter(
343            256 * 1024 * 1024, // 256MB/s base rate
344            100_000,           // 100ms refill (standard value)
345            10,                // fairness (standard value)
346        );
347
348        opts.set_periodic_compaction_seconds(3600 * 24); // force compaction once a day
349    }
350}
351
352/// Stores temp cells data
353/// - Key: `ton_types::UInt256` (cell repr hash)
354/// - Value: `StorageCell`
355pub struct TempCells;
356
357impl ColumnFamily for TempCells {
358    const NAME: &'static str = "temp_cells";
359}
360
361impl ColumnFamilyOptions<TableContext> for TempCells {
362    fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
363        let mut block_factory = BlockBasedOptions::default();
364        block_factory.set_block_cache(&ctx.caches().block_cache);
365        block_factory.set_data_block_index_type(DataBlockIndexType::BinaryAndHash);
366        block_factory.set_whole_key_filtering(true);
367        block_factory.set_checksum_type(rocksdb::ChecksumType::NoChecksum);
368
369        block_factory.set_bloom_filter(10.0, false);
370        block_factory.set_block_size(16 * 1024);
371        block_factory.set_format_version(5);
372
373        opts.set_optimize_filters_for_hits(true);
374    }
375}
376
377/// Stores connections data
378/// - Key: `BlockIdShort (16 bytes), [u8; 32] (block root hash), connection type (1 byte)`
379/// - Value: `BlockId (LE)`
380pub struct BlockConnections;
381
382impl BlockConnections {
383    pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 1;
384}
385
386impl ColumnFamily for BlockConnections {
387    const NAME: &'static str = "block_connections";
388
389    fn read_options(opts: &mut ReadOptions) {
390        opts.set_verify_checksums(false);
391    }
392}
393
394impl ColumnFamilyOptions<TableContext> for BlockConnections {
395    fn options(opts: &mut Options, ctx: &mut TableContext) {
396        default_block_based_table_factory(opts, ctx);
397        optimize_for_point_lookup(opts, ctx);
398    }
399}
400
401// === Old collator storage ===
402
403// TODO should be deleted
404pub struct ShardInternalMessagesOld;
405impl ColumnFamily for ShardInternalMessagesOld {
406    const NAME: &'static str = "shard_int_msgs";
407
408    fn read_options(opts: &mut ReadOptions) {
409        opts.set_verify_checksums(true);
410    }
411}
412
413impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesOld {
414    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
415}
416
417// TODO should be deleted
418pub struct ShardInternalMessagesUncommitedOld;
419impl ColumnFamily for ShardInternalMessagesUncommitedOld {
420    const NAME: &'static str = "shard_int_msgs_uncommited";
421
422    fn read_options(opts: &mut ReadOptions) {
423        opts.set_verify_checksums(true);
424    }
425}
426impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesUncommitedOld {
427    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
428}
429
430// TODO should be deleted
431pub struct InternalMessageStatsOld;
432impl ColumnFamily for InternalMessageStatsOld {
433    const NAME: &'static str = "int_msg_stats";
434
435    fn read_options(opts: &mut ReadOptions) {
436        opts.set_verify_checksums(true);
437    }
438}
439
440impl ColumnFamilyOptions<TableContext> for InternalMessageStatsOld {
441    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
442}
443
444// TODO should be deleted
445pub struct InternalMessageStatsUncommitedOld;
446impl ColumnFamily for InternalMessageStatsUncommitedOld {
447    const NAME: &'static str = "int_msg_stats_uncommited";
448
449    fn read_options(opts: &mut ReadOptions) {
450        opts.set_verify_checksums(true);
451    }
452}
453
454impl ColumnFamilyOptions<TableContext> for InternalMessageStatsUncommitedOld {
455    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
456}
457
458fn archive_data_merge(
459    _: &[u8],
460    current_value: Option<&[u8]>,
461    operands: &MergeOperands,
462) -> Option<Vec<u8>> {
463    let total_len = current_value.map(|data| data.len()).unwrap_or_default()
464        + operands.iter().map(|data| data.len()).sum::<usize>();
465
466    let mut result = Vec::with_capacity(total_len);
467
468    if let Some(current_value) = current_value {
469        result.extend_from_slice(current_value);
470    }
471
472    for data in operands {
473        result.extend_from_slice(data);
474    }
475
476    Some(result)
477}
478
479fn block_handle_merge(
480    _: &[u8],
481    current_value: Option<&[u8]>,
482    operands: &MergeOperands,
483) -> Option<Vec<u8>> {
484    let mut value = [0u8; 12];
485    if let Some(current_value) = current_value {
486        value.copy_from_slice(current_value);
487    }
488
489    for operand in operands {
490        assert_eq!(operand.len(), 12);
491        for (a, b) in std::iter::zip(&mut value, operand) {
492            *a |= *b;
493        }
494    }
495
496    Some(value.to_vec())
497}