tycho_core/storage/
db.rs

1use tycho_storage::kv::{
2    Migrations, NamedTables, StateVersionProvider, TableContext, WithMigrations,
3};
4use tycho_util::sync::CancellationFlag;
5use weedb::{MigrationError, Semver, VersionProvider, WeeDb};
6
7use super::tables;
8
9pub type CoreDb = WeeDb<CoreTables>;
10
11pub trait CoreDbExt {
12    fn normalize_version(&self) -> anyhow::Result<()>;
13}
14
15impl CoreDbExt for CoreDb {
16    // TEMP: Set a proper version on start. Remove on testnet reset.
17    fn normalize_version(&self) -> anyhow::Result<()> {
18        let provider = CoreTables::new_version_provider();
19
20        // Check if there is NO VERSION
21        if provider.get_version(self.raw())?.is_some() {
22            return Ok(());
23        }
24
25        // Check if the DB is NOT EMPTY
26        {
27            let mut package_entires_iter = self.package_entries.raw_iterator();
28            package_entires_iter.seek_to_first();
29            package_entires_iter.status()?;
30            if package_entires_iter.item().is_none() {
31                return Ok(());
32            }
33        }
34
35        // Set the initial version
36        tracing::warn!("normalizing DB version");
37        provider.set_version(self.raw(), [0, 0, 1])?;
38        Ok(())
39    }
40}
41
42impl NamedTables for CoreTables {
43    const NAME: &'static str = "base";
44}
45
46impl WithMigrations for CoreTables {
47    const VERSION: Semver = [0, 0, 3];
48
49    type VersionProvider = StateVersionProvider<tables::State>;
50
51    fn new_version_provider() -> Self::VersionProvider {
52        StateVersionProvider::new::<Self>()
53    }
54
55    fn register_migrations(
56        migrations: &mut Migrations<Self::VersionProvider, Self>,
57        cancelled: CancellationFlag,
58    ) -> Result<(), MigrationError> {
59        migrations.register([0, 0, 1], [0, 0, 2], move |db| {
60            core_migrations::v0_0_1_to_0_0_2(db, cancelled.clone())
61        })?;
62        migrations.register([0, 0, 2], [0, 0, 3], core_migrations::v_0_0_2_to_v_0_0_3)?;
63
64        Ok(())
65    }
66}
67
68weedb::tables! {
69    pub struct CoreTables<TableContext> {
70        pub state: tables::State,
71        pub archives: tables::Archives,
72        pub archive_block_ids: tables::ArchiveBlockIds,
73        pub block_handles: tables::BlockHandles,
74        pub key_blocks: tables::KeyBlocks,
75        pub full_block_ids: tables::FullBlockIds,
76        pub package_entries: tables::PackageEntries,
77        pub block_data_entries: tables::BlockDataEntries,
78        pub shard_states: tables::ShardStates,
79        pub cells: tables::Cells,
80        pub temp_cells: tables::TempCells,
81        pub block_connections: tables::BlockConnections,
82
83        // tables are empty, but they cannot be deleted because they are in a storage config
84        _shard_internal_messages: tables::ShardInternalMessagesOld,
85        _int_msg_stats_uncommited: tables::InternalMessageStatsUncommitedOld,
86        _shard_int_msgs_uncommited: tables::ShardInternalMessagesUncommitedOld,
87        _internal_message_stats: tables::InternalMessageStatsOld,
88    }
89}
90
91mod core_migrations {
92    use std::time::Instant;
93
94    use tycho_block_util::archive::ArchiveEntryType;
95    use tycho_storage::kv::StoredValue;
96    use tycho_types::boc::Boc;
97    use weedb::rocksdb::CompactOptions;
98
99    use super::*;
100    use crate::storage::PackageEntryKey;
101
102    pub fn v0_0_1_to_0_0_2(db: &CoreDb, cancelled: CancellationFlag) -> Result<(), MigrationError> {
103        let mut block_data_iter = db.package_entries.raw_iterator();
104        block_data_iter.seek_to_first();
105
106        tracing::info!("stated migrating package entries");
107
108        let started_at = Instant::now();
109        let mut total_processed = 0usize;
110        let mut block_ids_created = 0usize;
111
112        let full_block_ids_cf = &db.full_block_ids.cf();
113        let mut batch = weedb::rocksdb::WriteBatch::default();
114        let mut cancelled = cancelled.debounce(10);
115        loop {
116            let (key, value) = match block_data_iter.item() {
117                Some(item) if !cancelled.check() => item,
118                Some(_) => return Err(MigrationError::Custom(anyhow::anyhow!("cancelled").into())),
119                None => {
120                    block_data_iter.status()?;
121                    break;
122                }
123            };
124
125            'item: {
126                let key = PackageEntryKey::from_slice(key);
127                if key.ty != ArchiveEntryType::Block {
128                    break 'item;
129                }
130
131                let file_hash = Boc::file_hash_blake(value);
132                batch.put_cf(full_block_ids_cf, key.block_id.to_vec(), file_hash);
133                block_ids_created += 1;
134            }
135
136            block_data_iter.next();
137            total_processed += 1;
138        }
139
140        db.rocksdb()
141            .write_opt(batch, db.full_block_ids.write_config())?;
142
143        tracing::info!(
144            elapsed = %humantime::format_duration(started_at.elapsed()),
145            total_processed,
146            block_ids_created,
147            "finished migrating package entries"
148        );
149        Ok(())
150    }
151
152    pub fn v_0_0_2_to_v_0_0_3(db: &CoreDb) -> Result<(), MigrationError> {
153        let mut opts = CompactOptions::default();
154        opts.set_exclusive_manual_compaction(true);
155        let null = Option::<&[u8]>::None;
156
157        let started_at = Instant::now();
158        tracing::info!("started cells compaction");
159        db.cells
160            .db()
161            .compact_range_cf_opt(&db.cells.cf(), null, null, &opts);
162        tracing::info!(
163            elapsed = %humantime::format_duration(started_at.elapsed()),
164            "finished cells compaction"
165        );
166
167        Ok(())
168    }
169}