Skip to main content

tycho_core/storage/
tables.rs

1use bytesize::ByteSize;
2use tycho_storage::kv::{
3    DEFAULT_MIN_BLOB_SIZE, TableContext, default_block_based_table_factory,
4    optimize_for_level_compaction, optimize_for_point_lookup, refcount, with_blob_db,
5};
6use weedb::rocksdb::{
7    self, BlockBasedIndexType, BlockBasedOptions, CompactionPri, DBCompressionType,
8    DataBlockIndexType, MemtableFactory, MergeOperands, Options, ReadOptions, SliceTransform,
9};
10use weedb::{ColumnFamily, ColumnFamilyOptions};
11
12/// Stores generic node parameters
13/// - Key: `...`
14/// - Value: `...`
15pub struct State;
16
17impl ColumnFamily for State {
18    const NAME: &'static str = "state";
19}
20
21impl ColumnFamilyOptions<TableContext> for State {
22    fn options(opts: &mut Options, ctx: &mut TableContext) {
23        default_block_based_table_factory(opts, ctx);
24
25        opts.set_optimize_filters_for_hits(true);
26        optimize_for_point_lookup(opts, ctx);
27    }
28}
29
30/// Stores prepared archives
31/// - Key: `u32 (BE)` (archive id)
32/// - Value: `Vec<u8>` (archive block ids)
33pub struct ArchiveBlockIds;
34
35impl ColumnFamily for ArchiveBlockIds {
36    const NAME: &'static str = "archive_block_ids";
37}
38
39impl ColumnFamilyOptions<TableContext> for ArchiveBlockIds {
40    fn options(opts: &mut Options, ctx: &mut TableContext) {
41        default_block_based_table_factory(opts, ctx);
42
43        // Uses 128MB * 6 = 768 GB
44        optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
45
46        opts.set_merge_operator_associative("archive_data_merge", archive_data_merge);
47        // data is hardly compressible and dataset is small
48        opts.set_compression_type(DBCompressionType::None);
49        with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
50    }
51}
52
53/// Stores archive lifetime events.
54/// - Key: `u32 (BE)` (archive id) + `32 (BE)` (event id)
55/// - Value: event data
56pub struct ArchiveEvents;
57
58impl ArchiveEvents {
59    pub const KEY_LEN: usize = 4 + 4;
60}
61
62impl ColumnFamily for ArchiveEvents {
63    const NAME: &'static str = "archive_events";
64
65    fn read_options(opts: &mut rocksdb::ReadOptions) {
66        opts.set_verify_checksums(false);
67    }
68}
69
70impl ColumnFamilyOptions<TableContext> for ArchiveEvents {
71    fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
72        default_block_based_table_factory(opts, ctx);
73    }
74}
75
76/// Maps block root hash to block meta
77/// - Key: `[u8; 32]`
78/// - Value: `BlockMeta`
79pub struct BlockHandles;
80
81impl ColumnFamily for BlockHandles {
82    const NAME: &'static str = "block_handles";
83
84    fn read_options(opts: &mut ReadOptions) {
85        opts.set_verify_checksums(false);
86    }
87}
88
89impl ColumnFamilyOptions<TableContext> for BlockHandles {
90    fn options(opts: &mut Options, ctx: &mut TableContext) {
91        // Uses 128MB * 6 = 768 GB
92        optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
93
94        opts.set_merge_operator_associative("block_handle_merge", block_handle_merge);
95        optimize_for_point_lookup(opts, ctx);
96    }
97}
98
99/// Maps seqno to key block id
100/// - Key: `u32 (BE)`
101/// - Value: `BlockIdExt`
102pub struct KeyBlocks;
103
104impl ColumnFamily for KeyBlocks {
105    const NAME: &'static str = "key_blocks";
106
107    fn read_options(opts: &mut ReadOptions) {
108        opts.set_verify_checksums(false);
109    }
110}
111
112impl ColumnFamilyOptions<TableContext> for KeyBlocks {}
113
114/// Maps block id (partial) to file hash
115pub struct FullBlockIds;
116
117impl ColumnFamily for FullBlockIds {
118    const NAME: &'static str = "full_block_ids";
119
120    fn read_options(opts: &mut rocksdb::ReadOptions) {
121        opts.set_verify_checksums(false);
122    }
123}
124
125impl ColumnFamilyOptions<TableContext> for FullBlockIds {
126    fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
127        default_block_based_table_factory(opts, ctx);
128    }
129}
130
131/// Maps `BlockId` to root cell hash
132/// - Key: `BlockId`
133/// - Value: `[u8; 32] (state root hash)`
134pub struct ShardStates;
135
136impl ColumnFamily for ShardStates {
137    const NAME: &'static str = "shard_states";
138}
139
140impl ColumnFamilyOptions<TableContext> for ShardStates {
141    fn options(opts: &mut Options, ctx: &mut TableContext) {
142        default_block_based_table_factory(opts, ctx);
143        opts.set_compression_type(DBCompressionType::Zstd);
144    }
145}
146
147/// Stores cells data
148/// - Key: `[u8; 32]` (cell repr hash)
149/// - Value: `StorageCell`
150pub struct Cells;
151
152impl ColumnFamily for Cells {
153    const NAME: &'static str = "cells";
154}
155
156impl ColumnFamilyOptions<TableContext> for Cells {
157    fn options(opts: &mut Options, ctx: &mut TableContext) {
158        opts.set_level_compaction_dynamic_level_bytes(true);
159
160        opts.set_merge_operator_associative("cell_merge", refcount::merge_operator);
161        opts.set_compaction_filter("cell_compaction", refcount::compaction_filter);
162
163        // optimize for bulk inserts and single writer
164
165        // Uses 8 * 512MB = 4GB
166        let buffer_size = ByteSize::mib(512);
167        let buffers_to_merge = 2;
168        let buffer_count = 8;
169        opts.set_write_buffer_size(buffer_size.as_u64() as _);
170        opts.set_max_write_buffer_number(buffer_count);
171        opts.set_min_write_buffer_number_to_merge(buffers_to_merge); // allow early flush
172        ctx.track_buffer_usage(
173            ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
174            ByteSize(buffer_size.as_u64() * buffer_count as u64),
175        );
176
177        opts.set_max_successive_merges(0); // it will eat cpu, we are doing first merge in hashmap anyway.
178
179        // - Write batch size: 500K entries
180        // - Entry size: ~244 bytes (32 SHA + 8 seq + 192 value + 12 overhead)
181        // - Memtable size: 512MB
182
183        // 1. Entries per memtable = 512MB / 244B ≈ 2.2M entries
184        // 2. Target bucket load factor = 10-12 entries per bucket (RocksDB recommendation)
185        // 3. Bucket count = entries / target_load = 2.2M / 11 ≈ 200K
186        opts.set_memtable_factory(MemtableFactory::HashLinkList {
187            bucket_count: 200_000,
188        });
189
190        opts.set_memtable_prefix_bloom_ratio(0.1); // we use hash-based memtable so bloom filter is not that useful
191        opts.set_bloom_locality(1); // Optimize bloom filter locality
192
193        let mut block_factory = BlockBasedOptions::default();
194
195        // todo: some how make block cache separate for cells,
196        // using 3/4 of all available cache space
197        block_factory.set_block_cache(&ctx.caches().block_cache);
198
199        // 10 bits per key, stored at the end of the sst
200        block_factory.set_bloom_filter(10.0, false);
201        block_factory.set_optimize_filters_for_memory(true);
202        block_factory.set_whole_key_filtering(true);
203
204        // to match fs block size
205        block_factory.set_block_size(4096);
206        block_factory.set_format_version(6);
207
208        // we have 4096 / 256 = 16 keys per block, so binary search is enough
209        block_factory.set_data_block_index_type(DataBlockIndexType::BinarySearch);
210
211        block_factory.set_index_type(BlockBasedIndexType::HashSearch);
212        block_factory.set_pin_l0_filter_and_index_blocks_in_cache(true);
213
214        opts.set_block_based_table_factory(&block_factory);
215        opts.set_prefix_extractor(SliceTransform::create_noop());
216
217        opts.set_memtable_whole_key_filtering(true);
218        opts.set_memtable_prefix_bloom_ratio(0.25);
219
220        opts.set_compression_type(DBCompressionType::None);
221
222        opts.set_compaction_pri(CompactionPri::OldestSmallestSeqFirst);
223        opts.set_level_zero_file_num_compaction_trigger(8);
224
225        opts.set_target_file_size_base(512 * 1024 * 1024); // smaller files for more efficient GC
226
227        opts.set_max_bytes_for_level_base(4 * 1024 * 1024 * 1024); // 4GB per level
228        opts.set_max_bytes_for_level_multiplier(8.0);
229
230        // 512MB per file; less files - less compactions
231        opts.set_target_file_size_base(512 * 1024 * 1024);
232        // L1: 4GB
233        // L2: ~32GB
234        // L3: ~256GB
235        // L4: ~2TB
236        opts.set_num_levels(5);
237
238        opts.set_optimize_filters_for_hits(true);
239
240        // we have our own cache and don't want `kcompactd` goes brrr scenario
241        opts.set_use_direct_reads(true);
242        opts.set_use_direct_io_for_flush_and_compaction(true);
243
244        opts.add_compact_on_deletion_collector_factory(
245            100, // N: examine 100 consecutive entries
246            // Small enough window to detect local delete patterns
247            // Large enough to avoid spurious compactions
248            45, // D: trigger on 45 deletions in window
249            // Balance between the space reclaim and compaction frequency
250            // ~45% deletion density trigger
251            0.5, /* deletion_ratio: trigger if 50% of a total file is deleted
252                  * Backup trigger for overall file health
253                  * Higher than window trigger to prefer local optimization */
254        );
255
256        // single writer optimizations
257        opts.set_enable_write_thread_adaptive_yield(false);
258        opts.set_allow_concurrent_memtable_write(false);
259        opts.set_enable_pipelined_write(true);
260        opts.set_inplace_update_support(false);
261        opts.set_unordered_write(true); // we don't use snapshots
262        opts.set_avoid_unnecessary_blocking_io(true); // schedule unnecessary IO in background;
263
264        opts.set_auto_tuned_ratelimiter(
265            256 * 1024 * 1024, // 256MB/s base rate
266            100_000,           // 100ms refill (standard value)
267            10,                // fairness (standard value)
268        );
269
270        opts.set_periodic_compaction_seconds(3600 * 24); // force compaction once a day
271    }
272}
273
274/// Stores temp cells data
275/// - Key: `ton_types::UInt256` (cell repr hash)
276/// - Value: `StorageCell`
277pub struct TempCells;
278
279impl ColumnFamily for TempCells {
280    const NAME: &'static str = "temp_cells";
281}
282
283impl ColumnFamilyOptions<TableContext> for TempCells {
284    fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
285        let mut block_factory = BlockBasedOptions::default();
286        block_factory.set_block_cache(&ctx.caches().block_cache);
287        block_factory.set_data_block_index_type(DataBlockIndexType::BinaryAndHash);
288        block_factory.set_whole_key_filtering(true);
289        block_factory.set_checksum_type(rocksdb::ChecksumType::NoChecksum);
290
291        block_factory.set_bloom_filter(10.0, false);
292        block_factory.set_block_size(16 * 1024);
293        block_factory.set_format_version(5);
294
295        opts.set_optimize_filters_for_hits(true);
296    }
297}
298
299/// Stores connections data
300/// - Key: `BlockIdShort (16 bytes), [u8; 32] (block root hash), connection type (1 byte)`
301/// - Value: `BlockId (LE)`
302pub struct BlockConnections;
303
304impl BlockConnections {
305    pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 1;
306}
307
308impl ColumnFamily for BlockConnections {
309    const NAME: &'static str = "block_connections";
310
311    fn read_options(opts: &mut ReadOptions) {
312        opts.set_verify_checksums(false);
313    }
314}
315
316impl ColumnFamilyOptions<TableContext> for BlockConnections {
317    fn options(opts: &mut Options, ctx: &mut TableContext) {
318        default_block_based_table_factory(opts, ctx);
319        optimize_for_point_lookup(opts, ctx);
320    }
321}
322
323// === Old collator storage ===
324
325// TODO should be deleted
326pub struct ShardInternalMessagesOld;
327impl ColumnFamily for ShardInternalMessagesOld {
328    const NAME: &'static str = "shard_int_msgs";
329
330    fn read_options(opts: &mut ReadOptions) {
331        opts.set_verify_checksums(true);
332    }
333}
334
335impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesOld {
336    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
337}
338
339// TODO should be deleted
340pub struct ShardInternalMessagesUncommitedOld;
341impl ColumnFamily for ShardInternalMessagesUncommitedOld {
342    const NAME: &'static str = "shard_int_msgs_uncommited";
343
344    fn read_options(opts: &mut ReadOptions) {
345        opts.set_verify_checksums(true);
346    }
347}
348impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesUncommitedOld {
349    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
350}
351
352// TODO should be deleted
353pub struct InternalMessageStatsOld;
354impl ColumnFamily for InternalMessageStatsOld {
355    const NAME: &'static str = "int_msg_stats";
356
357    fn read_options(opts: &mut ReadOptions) {
358        opts.set_verify_checksums(true);
359    }
360}
361
362impl ColumnFamilyOptions<TableContext> for InternalMessageStatsOld {
363    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
364}
365
366// TODO should be deleted
367pub struct InternalMessageStatsUncommitedOld;
368impl ColumnFamily for InternalMessageStatsUncommitedOld {
369    const NAME: &'static str = "int_msg_stats_uncommited";
370
371    fn read_options(opts: &mut ReadOptions) {
372        opts.set_verify_checksums(true);
373    }
374}
375
376impl ColumnFamilyOptions<TableContext> for InternalMessageStatsUncommitedOld {
377    fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
378}
379
380fn archive_data_merge(
381    _: &[u8],
382    current_value: Option<&[u8]>,
383    operands: &MergeOperands,
384) -> Option<Vec<u8>> {
385    let total_len = current_value.map(|data| data.len()).unwrap_or_default()
386        + operands.iter().map(|data| data.len()).sum::<usize>();
387
388    let mut result = Vec::with_capacity(total_len);
389
390    if let Some(current_value) = current_value {
391        result.extend_from_slice(current_value);
392    }
393
394    for data in operands {
395        result.extend_from_slice(data);
396    }
397
398    Some(result)
399}
400
401fn block_handle_merge(
402    _: &[u8],
403    current_value: Option<&[u8]>,
404    operands: &MergeOperands,
405) -> Option<Vec<u8>> {
406    let mut value = [0u8; 12];
407    if let Some(current_value) = current_value {
408        value.copy_from_slice(current_value);
409    }
410
411    for operand in operands {
412        assert_eq!(operand.len(), 12);
413        for (a, b) in std::iter::zip(&mut value, operand) {
414            *a |= *b;
415        }
416    }
417
418    Some(value.to_vec())
419}