1use tycho_storage::kv::{
2 Migrations, NamedTables, StateVersionProvider, TableContext, WithMigrations,
3};
4use tycho_util::sync::CancellationFlag;
5use weedb::{MigrationError, Semver, VersionProvider, WeeDb};
6
7use super::tables;
8
9pub type CoreDb = WeeDb<CoreTables>;
10
11pub trait CoreDbExt {
12 fn normalize_version(&self) -> anyhow::Result<()>;
13}
14
15impl CoreDbExt for CoreDb {
16 fn normalize_version(&self) -> anyhow::Result<()> {
18 let provider = CoreTables::new_version_provider();
19
20 if provider.get_version(self.raw())?.is_some() {
22 return Ok(());
23 }
24
25 {
27 let mut package_entires_iter = self.package_entries.raw_iterator();
28 package_entires_iter.seek_to_first();
29 package_entires_iter.status()?;
30 if package_entires_iter.item().is_none() {
31 return Ok(());
32 }
33 }
34
35 tracing::warn!("normalizing DB version");
37 provider.set_version(self.raw(), [0, 0, 1])?;
38 Ok(())
39 }
40}
41
42impl NamedTables for CoreTables {
43 const NAME: &'static str = "base";
44}
45
46impl WithMigrations for CoreTables {
47 const VERSION: Semver = [0, 0, 3];
48
49 type VersionProvider = StateVersionProvider<tables::State>;
50
51 fn new_version_provider() -> Self::VersionProvider {
52 StateVersionProvider::new::<Self>()
53 }
54
55 fn register_migrations(
56 migrations: &mut Migrations<Self::VersionProvider, Self>,
57 cancelled: CancellationFlag,
58 ) -> Result<(), MigrationError> {
59 migrations.register([0, 0, 1], [0, 0, 2], move |db| {
60 core_migrations::v0_0_1_to_0_0_2(db, cancelled.clone())
61 })?;
62 migrations.register([0, 0, 2], [0, 0, 3], core_migrations::v_0_0_2_to_v_0_0_3)?;
63
64 Ok(())
65 }
66}
67
68weedb::tables! {
69 pub struct CoreTables<TableContext> {
70 pub state: tables::State,
71 pub archives: tables::Archives,
72 pub archive_block_ids: tables::ArchiveBlockIds,
73 pub block_handles: tables::BlockHandles,
74 pub key_blocks: tables::KeyBlocks,
75 pub full_block_ids: tables::FullBlockIds,
76 pub package_entries: tables::PackageEntries,
77 pub block_data_entries: tables::BlockDataEntries,
78 pub shard_states: tables::ShardStates,
79 pub cells: tables::Cells,
80 pub temp_cells: tables::TempCells,
81 pub block_connections: tables::BlockConnections,
82
83 _shard_internal_messages: tables::ShardInternalMessagesOld,
85 _int_msg_stats_uncommited: tables::InternalMessageStatsUncommitedOld,
86 _shard_int_msgs_uncommited: tables::ShardInternalMessagesUncommitedOld,
87 _internal_message_stats: tables::InternalMessageStatsOld,
88 }
89}
90
91mod core_migrations {
92 use std::time::Instant;
93
94 use tycho_block_util::archive::ArchiveEntryType;
95 use tycho_storage::kv::StoredValue;
96 use tycho_types::boc::Boc;
97 use weedb::rocksdb::CompactOptions;
98
99 use super::*;
100 use crate::storage::PackageEntryKey;
101
102 pub fn v0_0_1_to_0_0_2(db: &CoreDb, cancelled: CancellationFlag) -> Result<(), MigrationError> {
103 let mut block_data_iter = db.package_entries.raw_iterator();
104 block_data_iter.seek_to_first();
105
106 tracing::info!("stated migrating package entries");
107
108 let started_at = Instant::now();
109 let mut total_processed = 0usize;
110 let mut block_ids_created = 0usize;
111
112 let full_block_ids_cf = &db.full_block_ids.cf();
113 let mut batch = weedb::rocksdb::WriteBatch::default();
114 let mut cancelled = cancelled.debounce(10);
115 loop {
116 let (key, value) = match block_data_iter.item() {
117 Some(item) if !cancelled.check() => item,
118 Some(_) => return Err(MigrationError::Custom(anyhow::anyhow!("cancelled").into())),
119 None => {
120 block_data_iter.status()?;
121 break;
122 }
123 };
124
125 'item: {
126 let key = PackageEntryKey::from_slice(key);
127 if key.ty != ArchiveEntryType::Block {
128 break 'item;
129 }
130
131 let file_hash = Boc::file_hash_blake(value);
132 batch.put_cf(full_block_ids_cf, key.block_id.to_vec(), file_hash);
133 block_ids_created += 1;
134 }
135
136 block_data_iter.next();
137 total_processed += 1;
138 }
139
140 db.rocksdb()
141 .write_opt(batch, db.full_block_ids.write_config())?;
142
143 tracing::info!(
144 elapsed = %humantime::format_duration(started_at.elapsed()),
145 total_processed,
146 block_ids_created,
147 "finished migrating package entries"
148 );
149 Ok(())
150 }
151
152 pub fn v_0_0_2_to_v_0_0_3(db: &CoreDb) -> Result<(), MigrationError> {
153 let mut opts = CompactOptions::default();
154 opts.set_exclusive_manual_compaction(true);
155 let null = Option::<&[u8]>::None;
156
157 let started_at = Instant::now();
158 tracing::info!("started cells compaction");
159 db.cells
160 .db()
161 .compact_range_cf_opt(&db.cells.cf(), null, null, &opts);
162 tracing::info!(
163 elapsed = %humantime::format_duration(started_at.elapsed()),
164 "finished cells compaction"
165 );
166
167 Ok(())
168 }
169}