1use bytesize::ByteSize;
2use tycho_storage::kv::{
3 DEFAULT_MIN_BLOB_SIZE, TableContext, default_block_based_table_factory,
4 optimize_for_level_compaction, optimize_for_point_lookup, refcount, with_blob_db,
5};
6use weedb::rocksdb::{
7 self, BlockBasedIndexType, BlockBasedOptions, CompactionPri, DBCompressionType,
8 DataBlockIndexType, MemtableFactory, MergeOperands, Options, ReadOptions, SliceTransform,
9};
10use weedb::{ColumnFamily, ColumnFamilyOptions};
11
12pub struct State;
16
17impl ColumnFamily for State {
18 const NAME: &'static str = "state";
19}
20
21impl ColumnFamilyOptions<TableContext> for State {
22 fn options(opts: &mut Options, ctx: &mut TableContext) {
23 default_block_based_table_factory(opts, ctx);
24
25 opts.set_optimize_filters_for_hits(true);
26 optimize_for_point_lookup(opts, ctx);
27 }
28}
29
30pub struct ArchiveBlockIds;
34
35impl ColumnFamily for ArchiveBlockIds {
36 const NAME: &'static str = "archive_block_ids";
37}
38
39impl ColumnFamilyOptions<TableContext> for ArchiveBlockIds {
40 fn options(opts: &mut Options, ctx: &mut TableContext) {
41 default_block_based_table_factory(opts, ctx);
42
43 optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
45
46 opts.set_merge_operator_associative("archive_data_merge", archive_data_merge);
47 opts.set_compression_type(DBCompressionType::None);
49 with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
50 }
51}
52
53pub struct Archives;
57
58impl Archives {
59 pub const KEY_LEN: usize = 4 + 8;
60}
61
62impl ColumnFamily for Archives {
63 const NAME: &'static str = "archives";
64}
65
66impl ColumnFamilyOptions<TableContext> for Archives {
67 fn options(opts: &mut Options, ctx: &mut TableContext) {
68 default_block_based_table_factory(opts, ctx);
69
70 optimize_for_level_compaction(opts, ctx, ByteSize::mib(512), 8);
72
73 opts.set_compression_type(DBCompressionType::None);
75 with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
76 }
77}
78
79pub struct BlockHandles;
83
84impl ColumnFamily for BlockHandles {
85 const NAME: &'static str = "block_handles";
86
87 fn read_options(opts: &mut ReadOptions) {
88 opts.set_verify_checksums(false);
89 }
90}
91
92impl ColumnFamilyOptions<TableContext> for BlockHandles {
93 fn options(opts: &mut Options, ctx: &mut TableContext) {
94 optimize_for_level_compaction(opts, ctx, ByteSize::mib(128), 6);
96
97 opts.set_merge_operator_associative("block_handle_merge", block_handle_merge);
98 optimize_for_point_lookup(opts, ctx);
99 }
100}
101
102pub struct KeyBlocks;
106
107impl ColumnFamily for KeyBlocks {
108 const NAME: &'static str = "key_blocks";
109
110 fn read_options(opts: &mut ReadOptions) {
111 opts.set_verify_checksums(false);
112 }
113}
114
115impl ColumnFamilyOptions<TableContext> for KeyBlocks {}
116
117pub struct FullBlockIds;
119
120impl ColumnFamily for FullBlockIds {
121 const NAME: &'static str = "full_block_ids";
122
123 fn read_options(opts: &mut rocksdb::ReadOptions) {
124 opts.set_verify_checksums(false);
125 }
126}
127
128impl ColumnFamilyOptions<TableContext> for FullBlockIds {
129 fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
130 default_block_based_table_factory(opts, ctx);
131 }
132}
133
134pub struct PackageEntries;
138
139impl PackageEntries {
140 pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 1;
141}
142
143impl ColumnFamily for PackageEntries {
144 const NAME: &'static str = "package_entries";
145}
146
147impl ColumnFamilyOptions<TableContext> for PackageEntries {
148 fn options(opts: &mut Options, ctx: &mut TableContext) {
149 default_block_based_table_factory(opts, ctx);
150 opts.set_compression_type(DBCompressionType::Zstd);
151
152 let buffer_size = ByteSize::mib(512);
154 let buffers_to_merge = 2;
155 let buffer_count = 8;
156 opts.set_write_buffer_size(buffer_size.as_u64() as _);
157 opts.set_max_write_buffer_number(buffer_count);
158 opts.set_min_write_buffer_number_to_merge(buffers_to_merge); ctx.track_buffer_usage(
160 ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
161 ByteSize(buffer_size.as_u64() * buffer_count as u64),
162 );
163
164 with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::Zstd);
165
166 opts.set_optimize_filters_for_hits(true);
180 }
181}
182
183pub struct BlockDataEntries;
187
188impl BlockDataEntries {
189 pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 4;
190}
191
192impl ColumnFamily for BlockDataEntries {
193 const NAME: &'static str = "block_data_entries";
194}
195
196impl ColumnFamilyOptions<TableContext> for BlockDataEntries {
197 fn options(opts: &mut Options, ctx: &mut TableContext) {
198 default_block_based_table_factory(opts, ctx);
199
200 optimize_for_level_compaction(opts, ctx, ByteSize::mib(127), 6);
202
203 opts.set_compression_type(DBCompressionType::None);
205 with_blob_db(opts, DEFAULT_MIN_BLOB_SIZE, DBCompressionType::None);
206 }
207}
208
209pub struct ShardStates;
213
214impl ColumnFamily for ShardStates {
215 const NAME: &'static str = "shard_states";
216}
217
218impl ColumnFamilyOptions<TableContext> for ShardStates {
219 fn options(opts: &mut Options, ctx: &mut TableContext) {
220 default_block_based_table_factory(opts, ctx);
221 opts.set_compression_type(DBCompressionType::Zstd);
222 }
223}
224
225pub struct Cells;
229
230impl ColumnFamily for Cells {
231 const NAME: &'static str = "cells";
232}
233
234impl ColumnFamilyOptions<TableContext> for Cells {
235 fn options(opts: &mut Options, ctx: &mut TableContext) {
236 opts.set_level_compaction_dynamic_level_bytes(true);
237
238 opts.set_merge_operator_associative("cell_merge", refcount::merge_operator);
239 opts.set_compaction_filter("cell_compaction", refcount::compaction_filter);
240
241 let buffer_size = ByteSize::mib(512);
245 let buffers_to_merge = 2;
246 let buffer_count = 8;
247 opts.set_write_buffer_size(buffer_size.as_u64() as _);
248 opts.set_max_write_buffer_number(buffer_count);
249 opts.set_min_write_buffer_number_to_merge(buffers_to_merge); ctx.track_buffer_usage(
251 ByteSize(buffer_size.as_u64() * buffers_to_merge as u64),
252 ByteSize(buffer_size.as_u64() * buffer_count as u64),
253 );
254
255 opts.set_max_successive_merges(0); opts.set_memtable_factory(MemtableFactory::HashLinkList {
265 bucket_count: 200_000,
266 });
267
268 opts.set_memtable_prefix_bloom_ratio(0.1); opts.set_bloom_locality(1); let mut block_factory = BlockBasedOptions::default();
272
273 block_factory.set_block_cache(&ctx.caches().block_cache);
276
277 block_factory.set_bloom_filter(10.0, false);
279 block_factory.set_optimize_filters_for_memory(true);
280 block_factory.set_whole_key_filtering(true);
281
282 block_factory.set_block_size(4096);
284 block_factory.set_format_version(6);
285
286 block_factory.set_data_block_index_type(DataBlockIndexType::BinarySearch);
288
289 block_factory.set_index_type(BlockBasedIndexType::HashSearch);
290 block_factory.set_pin_l0_filter_and_index_blocks_in_cache(true);
291
292 opts.set_block_based_table_factory(&block_factory);
293 opts.set_prefix_extractor(SliceTransform::create_noop());
294
295 opts.set_memtable_whole_key_filtering(true);
296 opts.set_memtable_prefix_bloom_ratio(0.25);
297
298 opts.set_compression_type(DBCompressionType::None);
299
300 opts.set_compaction_pri(CompactionPri::OldestSmallestSeqFirst);
301 opts.set_level_zero_file_num_compaction_trigger(8);
302
303 opts.set_target_file_size_base(512 * 1024 * 1024); opts.set_max_bytes_for_level_base(4 * 1024 * 1024 * 1024); opts.set_max_bytes_for_level_multiplier(8.0);
307
308 opts.set_target_file_size_base(512 * 1024 * 1024);
310 opts.set_num_levels(5);
315
316 opts.set_optimize_filters_for_hits(true);
317
318 opts.set_use_direct_reads(true);
320 opts.set_use_direct_io_for_flush_and_compaction(true);
321
322 opts.add_compact_on_deletion_collector_factory(
323 100, 45, 0.5, );
333
334 opts.set_enable_write_thread_adaptive_yield(false);
336 opts.set_allow_concurrent_memtable_write(false);
337 opts.set_enable_pipelined_write(true);
338 opts.set_inplace_update_support(false);
339 opts.set_unordered_write(true); opts.set_avoid_unnecessary_blocking_io(true); opts.set_auto_tuned_ratelimiter(
343 256 * 1024 * 1024, 100_000, 10, );
347
348 opts.set_periodic_compaction_seconds(3600 * 24); }
350}
351
352pub struct TempCells;
356
357impl ColumnFamily for TempCells {
358 const NAME: &'static str = "temp_cells";
359}
360
361impl ColumnFamilyOptions<TableContext> for TempCells {
362 fn options(opts: &mut rocksdb::Options, ctx: &mut TableContext) {
363 let mut block_factory = BlockBasedOptions::default();
364 block_factory.set_block_cache(&ctx.caches().block_cache);
365 block_factory.set_data_block_index_type(DataBlockIndexType::BinaryAndHash);
366 block_factory.set_whole_key_filtering(true);
367 block_factory.set_checksum_type(rocksdb::ChecksumType::NoChecksum);
368
369 block_factory.set_bloom_filter(10.0, false);
370 block_factory.set_block_size(16 * 1024);
371 block_factory.set_format_version(5);
372
373 opts.set_optimize_filters_for_hits(true);
374 }
375}
376
377pub struct BlockConnections;
381
382impl BlockConnections {
383 pub const KEY_LEN: usize = 4 + 8 + 4 + 32 + 1;
384}
385
386impl ColumnFamily for BlockConnections {
387 const NAME: &'static str = "block_connections";
388
389 fn read_options(opts: &mut ReadOptions) {
390 opts.set_verify_checksums(false);
391 }
392}
393
394impl ColumnFamilyOptions<TableContext> for BlockConnections {
395 fn options(opts: &mut Options, ctx: &mut TableContext) {
396 default_block_based_table_factory(opts, ctx);
397 optimize_for_point_lookup(opts, ctx);
398 }
399}
400
401pub struct ShardInternalMessagesOld;
405impl ColumnFamily for ShardInternalMessagesOld {
406 const NAME: &'static str = "shard_int_msgs";
407
408 fn read_options(opts: &mut ReadOptions) {
409 opts.set_verify_checksums(true);
410 }
411}
412
413impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesOld {
414 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
415}
416
417pub struct ShardInternalMessagesUncommitedOld;
419impl ColumnFamily for ShardInternalMessagesUncommitedOld {
420 const NAME: &'static str = "shard_int_msgs_uncommited";
421
422 fn read_options(opts: &mut ReadOptions) {
423 opts.set_verify_checksums(true);
424 }
425}
426impl ColumnFamilyOptions<TableContext> for ShardInternalMessagesUncommitedOld {
427 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
428}
429
430pub struct InternalMessageStatsOld;
432impl ColumnFamily for InternalMessageStatsOld {
433 const NAME: &'static str = "int_msg_stats";
434
435 fn read_options(opts: &mut ReadOptions) {
436 opts.set_verify_checksums(true);
437 }
438}
439
440impl ColumnFamilyOptions<TableContext> for InternalMessageStatsOld {
441 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
442}
443
444pub struct InternalMessageStatsUncommitedOld;
446impl ColumnFamily for InternalMessageStatsUncommitedOld {
447 const NAME: &'static str = "int_msg_stats_uncommited";
448
449 fn read_options(opts: &mut ReadOptions) {
450 opts.set_verify_checksums(true);
451 }
452}
453
454impl ColumnFamilyOptions<TableContext> for InternalMessageStatsUncommitedOld {
455 fn options(_opts: &mut Options, _ctx: &mut TableContext) {}
456}
457
458fn archive_data_merge(
459 _: &[u8],
460 current_value: Option<&[u8]>,
461 operands: &MergeOperands,
462) -> Option<Vec<u8>> {
463 let total_len = current_value.map(|data| data.len()).unwrap_or_default()
464 + operands.iter().map(|data| data.len()).sum::<usize>();
465
466 let mut result = Vec::with_capacity(total_len);
467
468 if let Some(current_value) = current_value {
469 result.extend_from_slice(current_value);
470 }
471
472 for data in operands {
473 result.extend_from_slice(data);
474 }
475
476 Some(result)
477}
478
479fn block_handle_merge(
480 _: &[u8],
481 current_value: Option<&[u8]>,
482 operands: &MergeOperands,
483) -> Option<Vec<u8>> {
484 let mut value = [0u8; 12];
485 if let Some(current_value) = current_value {
486 value.copy_from_slice(current_value);
487 }
488
489 for operand in operands {
490 assert_eq!(operand.len(), 12);
491 for (a, b) in std::iter::zip(&mut value, operand) {
492 *a |= *b;
493 }
494 }
495
496 Some(value.to_vec())
497}