tycho_collator/storage/
tables.rs1use bytesize::ByteSize;
2use tycho_storage::kv::{
3 DEFAULT_MIN_BLOB_SIZE, TableContext, default_block_based_table_factory,
4 optimize_for_point_lookup, with_blob_db,
5};
6use weedb::rocksdb::{BlockBasedOptions, DBCompressionType, Options, ReadOptions};
7use weedb::{ColumnFamily, ColumnFamilyOptions};
8
9pub struct ShardInternalMessages;
11
12impl ColumnFamily for ShardInternalMessages {
13 const NAME: &'static str = "shard_int_messages";
14
15 fn read_options(opts: &mut ReadOptions) {
16 opts.set_verify_checksums(true);
17 }
18}
19
20impl ColumnFamilyOptions<TableContext> for ShardInternalMessages {
21 fn options(opts: &mut Options, ctx: &mut TableContext) {
22 internal_queue_options(opts, ctx);
23 with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
24 }
25}
26
27pub struct InternalMessageStatistics;
30
31impl ColumnFamily for InternalMessageStatistics {
32 const NAME: &'static str = "int_msg_statistics";
33
34 fn read_options(opts: &mut ReadOptions) {
35 opts.set_verify_checksums(true);
36 }
37}
38
39impl ColumnFamilyOptions<TableContext> for InternalMessageStatistics {
40 fn options(opts: &mut Options, ctx: &mut TableContext) {
41 internal_queue_options(opts, ctx);
42 }
43}
44
45pub struct InternalMessageVar;
48
49impl ColumnFamily for InternalMessageVar {
50 const NAME: &'static str = "int_msg_var";
51
52 fn read_options(opts: &mut ReadOptions) {
53 opts.set_verify_checksums(true);
54 }
55}
56
57impl ColumnFamilyOptions<TableContext> for InternalMessageVar {
58 fn options(opts: &mut Options, ctx: &mut TableContext) {
59 default_block_based_table_factory(opts, ctx);
60 opts.set_optimize_filters_for_hits(true);
61 optimize_for_point_lookup(opts, ctx);
62 }
63}
64
65pub struct InternalMessageDiffsTail;
68
69impl ColumnFamily for InternalMessageDiffsTail {
70 const NAME: &'static str = "int_msg_diffs_tail";
71
72 fn read_options(opts: &mut ReadOptions) {
73 opts.set_verify_checksums(true);
74 }
75}
76
77impl ColumnFamilyOptions<TableContext> for InternalMessageDiffsTail {
78 fn options(opts: &mut Options, ctx: &mut TableContext) {
79 internal_queue_options(opts, ctx);
80 }
81}
82
83pub struct InternalMessageDiffInfo;
86
87impl ColumnFamily for InternalMessageDiffInfo {
88 const NAME: &'static str = "int_msg_diff_info";
89
90 fn read_options(opts: &mut ReadOptions) {
91 opts.set_verify_checksums(true);
92 }
93}
94
95impl ColumnFamilyOptions<TableContext> for InternalMessageDiffInfo {
96 fn options(opts: &mut Options, ctx: &mut TableContext) {
97 internal_queue_options(opts, ctx);
98 }
99}
100
101pub struct InternalMessageCommitPointer;
104
105impl ColumnFamily for InternalMessageCommitPointer {
106 const NAME: &'static str = "int_msg_commit_pointer";
107
108 fn read_options(opts: &mut ReadOptions) {
109 opts.set_verify_checksums(true);
110 }
111}
112
113impl ColumnFamilyOptions<TableContext> for InternalMessageCommitPointer {
114 fn options(opts: &mut Options, ctx: &mut TableContext) {
115 default_block_based_table_factory(opts, ctx);
116 opts.set_optimize_filters_for_hits(true);
117 optimize_for_point_lookup(opts, ctx);
118 }
119}
120
121fn internal_queue_options(opts: &mut Options, ctx: &mut TableContext) {
124 let mut block_factory = BlockBasedOptions::default();
125 block_factory.set_block_cache(&ctx.caches().block_cache);
126 block_factory.set_format_version(6);
127
128 opts.set_block_based_table_factory(&block_factory);
129 opts.set_disable_auto_compactions(true);
130 opts.set_compression_type(DBCompressionType::None);
131
132 opts.set_level_compaction_dynamic_level_bytes(true);
133
134 let buffer_size = ByteSize::mib(256);
136 let buffers_to_merge = 2;
137 let buffer_count = 2;
138 opts.set_write_buffer_size(buffer_size.as_u64() as _);
139 opts.set_max_write_buffer_number(buffer_count);
140 opts.set_min_write_buffer_number_to_merge(buffers_to_merge); ctx.track_buffer_usage(
142 ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
143 ByteSize(buffer_size.as_u64() * buffer_count as u64),
144 );
145}