Skip to main content

forest/state_migration/common/
state_migration.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6
7use super::{
8    MigrationCache, Migrator, PostMigrationCheckArc, PostMigratorArc,
9    migration_job::{MigrationJob, MigrationJobOutput},
10    verifier::MigrationVerifier,
11};
12use crate::cid_collections::CidHashMap;
13use crate::shim::{clock::ChainEpoch, state_tree::StateTree};
14use cid::Cid;
15use fvm_ipld_blockstore::Blockstore;
16use nonzero_ext::nonzero;
17use parking_lot::Mutex;
18
19/// Handles several cases of migration:
20/// - nil migrations, essentially mapping one Actor to another,
21/// - migrations where state upgrade is required,
22/// - creating new actors that were not present in the prior network version.
23pub(in crate::state_migration) struct StateMigration<BS> {
24    migrations: CidHashMap<Migrator<BS>>,
25    /// Verifies correctness of the migration specification.
26    verifier: Option<MigrationVerifier<BS>>,
27    /// Post migrator(s). This may include new actor creation.
28    post_migrators: Vec<PostMigratorArc<BS>>,
29    /// Post migration checks. This is used to verify the correctness of the migration.
30    post_migration_checks: Vec<PostMigrationCheckArc<BS>>,
31}
32
33impl<BS: Blockstore> StateMigration<BS> {
34    pub(in crate::state_migration) fn new(verifier: Option<MigrationVerifier<BS>>) -> Self {
35        Self {
36            migrations: CidHashMap::new(),
37            verifier,
38            post_migrators: Default::default(),
39            post_migration_checks: Default::default(),
40        }
41    }
42
43    /// Inserts a new migrator into the migration specification.
44    pub(in crate::state_migration) fn add_migrator(
45        &mut self,
46        prior_cid: Cid,
47        migrator: Migrator<BS>,
48    ) {
49        self.migrations.insert(prior_cid, migrator);
50    }
51
52    /// Inserts a new post migrator into the post migration specification.
53    pub(in crate::state_migration) fn add_post_migrator(
54        &mut self,
55        post_migrator: PostMigratorArc<BS>,
56    ) {
57        self.post_migrators.push(post_migrator);
58    }
59
60    /// Inserts a new post migration check into the post migration checks specification.
61    pub(in crate::state_migration) fn add_post_migration_check(
62        &mut self,
63        post_migration_check: PostMigrationCheckArc<BS>,
64    ) {
65        self.post_migration_checks.push(post_migration_check);
66    }
67}
68
69impl<BS: Blockstore + Send + Sync> StateMigration<BS> {
70    pub(in crate::state_migration) fn migrate_state_tree(
71        &self,
72        store: &BS,
73        prior_epoch: ChainEpoch,
74        actors_in: StateTree<BS>,
75        mut actors_out: StateTree<BS>,
76    ) -> anyhow::Result<Cid> {
77        // Checks if the migration specification is correct
78        if let Some(verifier) = &self.verifier {
79            verifier.verify_migration(store, &self.migrations, &actors_in)?;
80        }
81
82        let cache = MigrationCache::new(nonzero!(10_000usize));
83        let num_threads = std::env::var("FOREST_STATE_MIGRATION_THREADS")
84            .ok()
85            .and_then(|s| s.parse().ok())
86            // Don't use all CPU, otherwise the migration will starve the rest of the system.
87            .unwrap_or_else(|| num_cpus::get() / 2)
88            // At least 3 are required to not deadlock the migration.
89            .max(3);
90
91        let pool = rayon::ThreadPoolBuilder::new()
92            .thread_name(|id| format!("state migration thread: {id}"))
93            .num_threads(num_threads)
94            .build()?;
95
96        let (state_tx, state_rx) = flume::bounded(30);
97        let (job_tx, job_rx) = flume::bounded(30);
98
99        let job_counter = AtomicU64::new(0);
100        let cache_clone = cache.clone();
101
102        let actors_in = Arc::new(Mutex::new(actors_in));
103        let actors_in_clone = actors_in.clone();
104        pool.scope(|s| {
105            s.spawn(move |_| {
106                actors_in.lock()
107                    .for_each(|addr, state| {
108                        state_tx
109                            .send((addr, state.clone()))
110                            .expect("failed sending actor state through channel");
111                        Ok(())
112                    })
113                    .expect("Failed iterating over actor state");
114            });
115
116            s.spawn(move |scope| {
117                while let Ok((address, state)) = state_rx.recv() {
118                    let job_tx = job_tx.clone();
119                    let migrator = self.migrations.get(&state.code).cloned().unwrap_or_else(|| panic!("migration failed with state code: {}", state.code));
120
121                    // Deferred migrations should be done at a later time.
122                    if migrator.is_deferred() {
123                        continue;
124                    }
125                    let cache_clone = cache_clone.clone();
126                    scope.spawn(move |_| {
127                        let job = MigrationJob {
128                            address,
129                            actor_state: state,
130                            actor_migration: migrator,
131                        };
132
133                        let job_output = job.run(store, prior_epoch, cache_clone).unwrap_or_else(|e| {
134                            panic!(
135                                "failed executing job for address: {address}, Reason: {e:#}"
136                            )
137                        });
138
139                        job_tx.send(job_output).unwrap_or_else(|_| {
140                            panic!("failed sending job output for address: {address}")
141                        });
142                    });
143                }
144                drop(job_tx);
145            });
146
147            while let Ok(job_output) = job_rx.recv() {
148                if let Some(MigrationJobOutput {
149                    address,
150                    actor_state,
151                }) = job_output {
152                    actors_out
153                        .set_actor(&address, actor_state)
154                        .unwrap_or_else(|e| {
155                            panic!(
156                                "Failed setting new actor state at given address: {address}, Reason: {e:#}"
157                            )
158                        });
159                    job_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
160                    let job_counter = job_counter.load(std::sync::atomic::Ordering::Relaxed);
161                    if job_counter.is_multiple_of(100_000) {
162                        tracing::info!("Processed {job_counter} actors", job_counter = job_counter);
163                    }
164                }
165            }
166        });
167
168        // This is okay to execute even if there are no deferred migrations, as the iteration is
169        // very cheap; ~200ms on mainnet. The alternative is to collect the deferred migrations
170        // into a separate collection, which would increase the memory footprint of the migration.
171        tracing::info!("Processing deferred migrations");
172        let mut job_counter = 0;
173        actors_in_clone.lock().for_each(|address, state| {
174            job_counter += 1;
175            let migrator = self
176                .migrations
177                .get(&state.code)
178                .cloned()
179                .unwrap_or_else(|| panic!("migration failed with state code: {}", state.code));
180
181            if !migrator.is_deferred() {
182                return Ok(());
183            }
184
185            let job = MigrationJob {
186                address,
187                actor_state: state.clone(),
188                actor_migration: migrator,
189            };
190            let job_output = job.run(store, prior_epoch, cache.clone())?;
191            if let Some(MigrationJobOutput {
192                address,
193                actor_state,
194            }) = job_output
195            {
196                actors_out
197                    .set_actor(&address, actor_state)
198                    .unwrap_or_else(|e| {
199                        panic!(
200                            "Failed setting new actor state at given address: {address}, Reason: {e:#}"
201                        )
202                    });
203            }
204
205            Ok(())
206        })?;
207        tracing::info!("Processed {job_counter} deferred migrations");
208
209        // execute post migration actions, e.g., create new actors
210        for post_migrator in self.post_migrators.iter() {
211            post_migrator.post_migrate_state(store, &mut actors_out)?;
212        }
213
214        // execute post migration checks
215        for post_migration_check in self.post_migration_checks.iter() {
216            post_migration_check.post_migrate_check(store, &actors_out)?;
217        }
218
219        actors_out.flush()
220    }
221}