tycho_core/storage/
tables.rs1use bytesize::ByteSize;
2use tycho_storage::kv::{
3 DEFAULT_MIN_BLOB_SIZE, TableContext, default_block_based_table_factory,
4 optimize_for_level_compaction, optimize_for_point_lookup, refcount, with_blob_db,
5};
6use weedb::rocksdb::{
7 self, BlockBasedIndexType, BlockBasedOptions, CompactionPri, DBCompressionType,
8 DataBlockIndexType, MemtableFactory, MergeOperands, Options, ReadOptions, SliceTransform,
9};
10use weedb::{ColumnFamily, ColumnFamilyOptions};
11
12pub struct State;
16
17impl ColumnFamily for State {
18 const NAME: &'static str = "state";
19}
20
21impl ColumnFamilyOptions<TableContext> for State {
22 fn options(opts: &mut Options, ctx: &mut TableContext) {
23 default_block_based_table_factory(opts, ctx);
24
25 opts.set_optimize_filters_for_hits(true);
26 optimize_for_point_lookup(opts, ctx);
27 }
28}
29
30pub struct ArchiveBlockIds;
34
35impl ColumnFamily for ArchiveBlockIds {
36 const NAME: &'static str = "archive_block_ids";
37}
38
39impl ColumnFamilyOptions<TableContext> for ArchiveBlockIds {
40 fn options(opts: &mut Options, ctx: &mut TableContext) {
41 default_block_based_table_factory(opts, ctx);
42
43 optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
45
46 opts.set_merge_operator_associative("archive_data_merge", archive_data_merge);
47 opts.set_compression_type(DBCompressionType::None);
49 with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
50 }
51}
52
53pub struct ArchiveEvents;
57
58impl ArchiveEvents {
59 pub const KEY_LEN: usize = 4 + 4;
60}
61
62impl ColumnFamily for ArchiveEvents {
63 const NAME: &'static str = "archive_events";
64
65 fn read_options(opts: &mut rocksdb::ReadOptions) {
66 opts.set_verify_checksums(false);
67 }
68}
69
70impl ColumnFamilyOptions<TableContext> for ArchiveEvents {
71 fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
72 default_block_based_table_factory(opts, ctx);
73 }
74}
75
76pub struct BlockHandles;
80
81impl ColumnFamily for BlockHandles {
82 const NAME: &'static str = "block_handles";
83
84 fn read_options(opts: &mut ReadOptions) {
85 opts.set_verify_checksums(false);
86 }
87}
88
89impl ColumnFamilyOptions<TableContext> for BlockHandles {
90 fn options(opts: &mut Options, ctx: &mut TableContext) {
91 optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
93
94 opts.set_merge_operator_associative("block_handle_merge", block_handle_merge);
95 optimize_for_point_lookup(opts, ctx);
96 }
97}
98
99pub struct KeyBlocks;
103
104impl ColumnFamily for KeyBlocks {
105 const NAME: &'static str = "key_blocks";
106
107 fn read_options(opts: &mut ReadOptions) {
108 opts.set_verify_checksums(false);
109 }
110}
111
112impl ColumnFamilyOptions<TableContext> for KeyBlocks {}
113
114pub struct FullBlockIds;
116
117impl ColumnFamily for FullBlockIds {
118 const NAME: &'static str = "full_block_ids";
119
120 fn read_options(opts: &mut rocksdb::ReadOptions) {
121 opts.set_verify_checksums(false);
122 }
123}
124
125impl ColumnFamilyOptions<TableContext> for FullBlockIds {
126 fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
127 default_block_based_table_factory(opts, ctx);
128 }
129}
130
131pub struct ShardStates;
135
136impl ColumnFamily for ShardStates {
137 const NAME: &'static str = "shard_states";
138}
139
140impl ColumnFamilyOptions<TableContext> for ShardStates {
141 fn options(opts: &mut Options, ctx: &mut TableContext) {
142 default_block_based_table_factory(opts, ctx);
143 opts.set_compression_type(DBCompressionType::Zstd);
144 }
145}
146
147pub struct Cells;
151
152impl ColumnFamily for Cells {
153 const NAME: &'static str = "cells";
154}
155
156impl ColumnFamilyOptions<TableContext> for Cells {
157 fn options(opts: &mut Options, ctx: &mut TableContext) {
158 opts.set_level_compaction_dynamic_level_bytes(true);
159
160 opts.set_merge_operator_associative("cell_merge", refcount::merge_operator);
161 opts.set_compaction_filter("cell_compaction", refcount::compaction_filter);
162
163 let buffer_size = ByteSize::mib(512);
167 let buffers_to_merge = 2;
168 let buffer_count = 8;
169 opts.set_write_buffer_size(buffer_size.as_u64() as _);
170 opts.set_max_write_buffer_number(buffer_count);
171 opts.set_min_write_buffer_number_to_merge(buffers_to_merge); ctx.track_buffer_usage(
173 ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
174 ByteSize(buffer_size.as_u64() * buffer_count as u64),
175 );
176
177 opts.set_max_successive_merges(0); opts.set_memtable_factory(MemtableFactory::HashLinkList {
187 bucket_count: 200_000,
188 });
189
190 opts.set_memtable_prefix_bloom_ratio(0.1); opts.set_bloom_locality(1); let mut block_factory = BlockBasedOptions::default();
194
195 block_factory.set_block_cache(&ctx.caches().block_cache);
198
199 block_factory.set_bloom_filter(10.0, false);
201 block_factory.set_optimize_filters_for_memory(true);
202 block_factory.set_whole_key_filtering(true);
203
204 block_factory.set_block_size(4096);
206 block_factory.set_format_version(6);
207
208 block_factory.set_data_block_index_type(DataBlockIndexType::BinarySearch);
210
211 block_factory.set_index_type(BlockBasedIndexType::HashSearch);
212 block_factory.set_pin_l0_filter_and_index_blocks_in_cache(true);
213
214 opts.set_block_based_table_factory(&block_factory);
215 opts.set_prefix_extractor(SliceTransform::create_noop());
216
217 opts.set_memtable_whole_key_filtering(true);
218 opts.set_memtable_prefix_bloom_ratio(0.25);
219
220 opts.set_compression_type(DBCompressionType::None);
221
222 opts.set_compaction_pri(CompactionPri::OldestSmallestSeqFirst);
223 opts.set_level_zero_file_num_compaction_trigger(8);
224
225 opts.set_target_file_size_base(512 * 1024 * 1024); opts.set_max_bytes_for_level_base(4 * 1024 * 1024 * 1024); opts.set_max_bytes_for_level_multiplier(8.0);
229
230 opts.set_target_file_size_base(512 * 1024 * 1024);
232 opts.set_num_levels(5);
237
238 opts.set_optimize_filters_for_hits(true);
239
240 opts.set_use_direct_reads(true);
242 opts.set_use_direct_io_for_flush_and_compaction(true);
243
244 opts.add_compact_on_deletion_collector_factory(
245 100, 45, 0.5, );
255
256 opts.set_enable_write_thread_adaptive_yield(false);
258 opts.set_allow_concurrent_memtable_write(false);
259 opts.set_enable_pipelined_write(true);
260 opts.set_inplace_update_support(false);
261 opts.set_unordered_write(true); opts.set_avoid_unnecessary_blocking_io(true); opts.set_auto_tuned_ratelimiter(
265 256 * 1024 * 1024, 100_000, 10, );
269
270 opts.set_periodic_compaction_seconds(3600 * 24); }
272}
273
274pub struct TempCells;
278
279impl ColumnFamily for TempCells {
280 const NAME: &'static str = "temp_cells";
281}
282
283impl ColumnFamilyOptions<TableContext> for TempCells {
284 fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
285 let mut block_factory = BlockBasedOptions::default();
286 block_factory.set_block_cache(&ctx.caches().block_cache);
287 block_factory.set_data_block_index_type(DataBlockIndexType::BinaryAndHash);
288 block_factory.set_whole_key_filtering(true);
289 block_factory.set_checksum_type(rocksdb::ChecksumType::NoChecksum);
290
291 block_factory.set_bloom_filter(10.0, false);
292 block_factory.set_block_size(16 * 1024);
293 block_factory.set_format_version(5);
294
295 opts.set_optimize_filters_for_hits(true);
296 }
297}
298
299pub struct BlockConnections;
303
304impl BlockConnections {
305 pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 1;
306}
307
308impl ColumnFamily for BlockConnections {
309 const NAME: &'static str = "block_connections";
310
311 fn read_options(opts: &mut ReadOptions) {
312 opts.set_verify_checksums(false);
313 }
314}
315
316impl ColumnFamilyOptions<TableContext> for BlockConnections {
317 fn options(opts: &mut Options, ctx: &mut TableContext) {
318 default_block_based_table_factory(opts, ctx);
319 optimize_for_point_lookup(opts, ctx);
320 }
321}
322
323pub struct ShardInternalMessagesOld;
327impl ColumnFamily for ShardInternalMessagesOld {
328 const NAME: &'static str = "shard_int_msgs";
329
330 fn read_options(opts: &mut ReadOptions) {
331 opts.set_verify_checksums(true);
332 }
333}
334
335impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesOld {
336 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
337}
338
339pub struct ShardInternalMessagesUncommitedOld;
341impl ColumnFamily for ShardInternalMessagesUncommitedOld {
342 const NAME: &'static str = "shard_int_msgs_uncommited";
343
344 fn read_options(opts: &mut ReadOptions) {
345 opts.set_verify_checksums(true);
346 }
347}
348impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesUncommitedOld {
349 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
350}
351
352pub struct InternalMessageStatsOld;
354impl ColumnFamily for InternalMessageStatsOld {
355 const NAME: &'static str = "int_msg_stats";
356
357 fn read_options(opts: &mut ReadOptions) {
358 opts.set_verify_checksums(true);
359 }
360}
361
362impl ColumnFamilyOptions<TableContext> for InternalMessageStatsOld {
363 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
364}
365
366pub struct InternalMessageStatsUncommitedOld;
368impl ColumnFamily for InternalMessageStatsUncommitedOld {
369 const NAME: &'static str = "int_msg_stats_uncommited";
370
371 fn read_options(opts: &mut ReadOptions) {
372 opts.set_verify_checksums(true);
373 }
374}
375
376impl ColumnFamilyOptions<TableContext> for InternalMessageStatsUncommitedOld {
377 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
378}
379
380fn archive_data_merge(
381 _: &[u8],
382 current_value: Option<&[u8]>,
383 operands: &MergeOperands,
384) -> Option<Vec<u8>> {
385 let total_len = current_value.map(|data| data.len()).unwrap_or_default()
386 + operands.iter().map(|data| data.len()).sum::<usize>();
387
388 let mut result = Vec::with_capacity(total_len);
389
390 if let Some(current_value) = current_value {
391 result.extend_from_slice(current_value);
392 }
393
394 for data in operands {
395 result.extend_from_slice(data);
396 }
397
398 Some(result)
399}
400
401fn block_handle_merge(
402 _: &[u8],
403 current_value: Option<&[u8]>,
404 operands: &MergeOperands,
405) -> Option<Vec<u8>> {
406 let mut value = [0u8; 12];
407 if let Some(current_value) = current_value {
408 value.copy_from_slice(current_value);
409 }
410
411 for operand in operands {
412 assert_eq!(operand.len(), 12);
413 for (a, b) in std::iter::zip(&mut value, operand) {
414 *a |= *b;
415 }
416 }
417
418 Some(value.to_vec())
419}