Skip to main content

tycho_collator/storage/
tables.rs

1use bytesize::ByteSize;
2use tycho_storage::kv::{
3    DEFAULT_MIN_BLOB_SIZE, TableContext, default_block_based_table_factory,
4    optimize_for_point_lookup, with_blob_db,
5};
6use weedb::rocksdb::{BlockBasedOptions, DBCompressionType, Options, ReadOptions};
7use weedb::{ColumnFamily, ColumnFamilyOptions};
8
9/// Stores persistent internal messages
10pub struct ShardInternalMessages;
11
12impl ColumnFamily for ShardInternalMessages {
13    const NAME: &'static str = "shard_int_messages";
14
15    fn read_options(opts: &mut ReadOptions) {
16        opts.set_verify_checksums(true);
17    }
18}
19
20impl ColumnFamilyOptions<TableContext> for ShardInternalMessages {
21    fn options(opts: &mut Options, ctx: &mut TableContext) {
22        internal_queue_options(opts, ctx);
23        with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
24    }
25}
26
27//
28
29pub struct InternalMessageStatistics;
30
31impl ColumnFamily for InternalMessageStatistics {
32    const NAME: &'static str = "int_msg_statistics";
33
34    fn read_options(opts: &mut ReadOptions) {
35        opts.set_verify_checksums(true);
36    }
37}
38
39impl ColumnFamilyOptions<TableContext> for InternalMessageStatistics {
40    fn options(opts: &mut Options, ctx: &mut TableContext) {
41        internal_queue_options(opts, ctx);
42    }
43}
44
45//
46
47pub struct InternalMessageVar;
48
49impl ColumnFamily for InternalMessageVar {
50    const NAME: &'static str = "int_msg_var";
51
52    fn read_options(opts: &mut ReadOptions) {
53        opts.set_verify_checksums(true);
54    }
55}
56
57impl ColumnFamilyOptions<TableContext> for InternalMessageVar {
58    fn options(opts: &mut Options, ctx: &mut TableContext) {
59        default_block_based_table_factory(opts, ctx);
60        opts.set_optimize_filters_for_hits(true);
61        optimize_for_point_lookup(opts, ctx);
62    }
63}
64
65//
66
67pub struct InternalMessageDiffsTail;
68
69impl ColumnFamily for InternalMessageDiffsTail {
70    const NAME: &'static str = "int_msg_diffs_tail";
71
72    fn read_options(opts: &mut ReadOptions) {
73        opts.set_verify_checksums(true);
74    }
75}
76
77impl ColumnFamilyOptions<TableContext> for InternalMessageDiffsTail {
78    fn options(opts: &mut Options, ctx: &mut TableContext) {
79        internal_queue_options(opts, ctx);
80    }
81}
82
83//
84
85pub struct InternalMessageDiffInfo;
86
87impl ColumnFamily for InternalMessageDiffInfo {
88    const NAME: &'static str = "int_msg_diff_info";
89
90    fn read_options(opts: &mut ReadOptions) {
91        opts.set_verify_checksums(true);
92    }
93}
94
95impl ColumnFamilyOptions<TableContext> for InternalMessageDiffInfo {
96    fn options(opts: &mut Options, ctx: &mut TableContext) {
97        internal_queue_options(opts, ctx);
98    }
99}
100
101//
102
103pub struct InternalMessageCommitPointer;
104
105impl ColumnFamily for InternalMessageCommitPointer {
106    const NAME: &'static str = "int_msg_commit_pointer";
107
108    fn read_options(opts: &mut ReadOptions) {
109        opts.set_verify_checksums(true);
110    }
111}
112
113impl ColumnFamilyOptions<TableContext> for InternalMessageCommitPointer {
114    fn options(opts: &mut Options, ctx: &mut TableContext) {
115        default_block_based_table_factory(opts, ctx);
116        opts.set_optimize_filters_for_hits(true);
117        optimize_for_point_lookup(opts, ctx);
118    }
119}
120
121// === Helpers ===
122
123fn internal_queue_options(opts: &mut Options, ctx: &mut TableContext) {
124    let mut block_factory = BlockBasedOptions::default();
125    block_factory.set_block_cache(&ctx.caches().block_cache);
126    block_factory.set_format_version(6);
127
128    opts.set_block_based_table_factory(&block_factory);
129    opts.set_disable_auto_compactions(true);
130    opts.set_compression_type(DBCompressionType::None);
131
132    opts.set_level_compaction_dynamic_level_bytes(true);
133
134    // optimize for bulk inserts and single writer
135    let buffer_size = ByteSize::mib(256);
136    let buffers_to_merge = 2;
137    let buffer_count = 2;
138    opts.set_write_buffer_size(buffer_size.as_u64() as _);
139    opts.set_max_write_buffer_number(buffer_count);
140    opts.set_min_write_buffer_number_to_merge(buffers_to_merge); // allow early flush
141    ctx.track_buffer_usage(
142        ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
143        ByteSize(buffer_size.as_u64() * buffer_count as u64),
144    );
145}