forest/state_migration/common/
state_migration.rs1use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6
7use super::{
8 MigrationCache, Migrator, PostMigrationCheckArc, PostMigratorArc,
9 migration_job::{MigrationJob, MigrationJobOutput},
10 verifier::MigrationVerifier,
11};
12use crate::cid_collections::CidHashMap;
13use crate::shim::{clock::ChainEpoch, state_tree::StateTree};
14use cid::Cid;
15use fvm_ipld_blockstore::Blockstore;
16use nonzero_ext::nonzero;
17use parking_lot::Mutex;
18
19pub(in crate::state_migration) struct StateMigration<BS> {
24 migrations: CidHashMap<Migrator<BS>>,
25 verifier: Option<MigrationVerifier<BS>>,
27 post_migrators: Vec<PostMigratorArc<BS>>,
29 post_migration_checks: Vec<PostMigrationCheckArc<BS>>,
31}
32
33impl<BS: Blockstore> StateMigration<BS> {
34 pub(in crate::state_migration) fn new(verifier: Option<MigrationVerifier<BS>>) -> Self {
35 Self {
36 migrations: CidHashMap::new(),
37 verifier,
38 post_migrators: Default::default(),
39 post_migration_checks: Default::default(),
40 }
41 }
42
43 pub(in crate::state_migration) fn add_migrator(
45 &mut self,
46 prior_cid: Cid,
47 migrator: Migrator<BS>,
48 ) {
49 self.migrations.insert(prior_cid, migrator);
50 }
51
52 pub(in crate::state_migration) fn add_post_migrator(
54 &mut self,
55 post_migrator: PostMigratorArc<BS>,
56 ) {
57 self.post_migrators.push(post_migrator);
58 }
59
60 pub(in crate::state_migration) fn add_post_migration_check(
62 &mut self,
63 post_migration_check: PostMigrationCheckArc<BS>,
64 ) {
65 self.post_migration_checks.push(post_migration_check);
66 }
67}
68
69impl<BS: Blockstore + Send + Sync> StateMigration<BS> {
70 pub(in crate::state_migration) fn migrate_state_tree(
71 &self,
72 store: &BS,
73 prior_epoch: ChainEpoch,
74 actors_in: StateTree<BS>,
75 mut actors_out: StateTree<BS>,
76 ) -> anyhow::Result<Cid> {
77 if let Some(verifier) = &self.verifier {
79 verifier.verify_migration(store, &self.migrations, &actors_in)?;
80 }
81
82 let cache = MigrationCache::new(nonzero!(10_000usize));
83 let num_threads = std::env::var("FOREST_STATE_MIGRATION_THREADS")
84 .ok()
85 .and_then(|s| s.parse().ok())
86 .unwrap_or_else(|| num_cpus::get() / 2)
88 .max(3);
90
91 let pool = rayon::ThreadPoolBuilder::new()
92 .thread_name(|id| format!("state migration thread: {id}"))
93 .num_threads(num_threads)
94 .build()?;
95
96 let (state_tx, state_rx) = flume::bounded(30);
97 let (job_tx, job_rx) = flume::bounded(30);
98
99 let job_counter = AtomicU64::new(0);
100 let cache_clone = cache.clone();
101
102 let actors_in = Arc::new(Mutex::new(actors_in));
103 let actors_in_clone = actors_in.clone();
104 pool.scope(|s| {
105 s.spawn(move |_| {
106 actors_in.lock()
107 .for_each(|addr, state| {
108 state_tx
109 .send((addr, state.clone()))
110 .expect("failed sending actor state through channel");
111 Ok(())
112 })
113 .expect("Failed iterating over actor state");
114 });
115
116 s.spawn(move |scope| {
117 while let Ok((address, state)) = state_rx.recv() {
118 let job_tx = job_tx.clone();
119 let migrator = self.migrations.get(&state.code).cloned().unwrap_or_else(|| panic!("migration failed with state code: {}", state.code));
120
121 if migrator.is_deferred() {
123 continue;
124 }
125 let cache_clone = cache_clone.clone();
126 scope.spawn(move |_| {
127 let job = MigrationJob {
128 address,
129 actor_state: state,
130 actor_migration: migrator,
131 };
132
133 let job_output = job.run(store, prior_epoch, cache_clone).unwrap_or_else(|e| {
134 panic!(
135 "failed executing job for address: {address}, Reason: {e:#}"
136 )
137 });
138
139 job_tx.send(job_output).unwrap_or_else(|_| {
140 panic!("failed sending job output for address: {address}")
141 });
142 });
143 }
144 drop(job_tx);
145 });
146
147 while let Ok(job_output) = job_rx.recv() {
148 if let Some(MigrationJobOutput {
149 address,
150 actor_state,
151 }) = job_output {
152 actors_out
153 .set_actor(&address, actor_state)
154 .unwrap_or_else(|e| {
155 panic!(
156 "Failed setting new actor state at given address: {address}, Reason: {e:#}"
157 )
158 });
159 job_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
160 let job_counter = job_counter.load(std::sync::atomic::Ordering::Relaxed);
161 if job_counter.is_multiple_of(100_000) {
162 tracing::info!("Processed {job_counter} actors", job_counter = job_counter);
163 }
164 }
165 }
166 });
167
168 tracing::info!("Processing deferred migrations");
172 let mut job_counter = 0;
173 actors_in_clone.lock().for_each(|address, state| {
174 job_counter += 1;
175 let migrator = self
176 .migrations
177 .get(&state.code)
178 .cloned()
179 .unwrap_or_else(|| panic!("migration failed with state code: {}", state.code));
180
181 if !migrator.is_deferred() {
182 return Ok(());
183 }
184
185 let job = MigrationJob {
186 address,
187 actor_state: state.clone(),
188 actor_migration: migrator,
189 };
190 let job_output = job.run(store, prior_epoch, cache.clone())?;
191 if let Some(MigrationJobOutput {
192 address,
193 actor_state,
194 }) = job_output
195 {
196 actors_out
197 .set_actor(&address, actor_state)
198 .unwrap_or_else(|e| {
199 panic!(
200 "Failed setting new actor state at given address: {address}, Reason: {e:#}"
201 )
202 });
203 }
204
205 Ok(())
206 })?;
207 tracing::info!("Processed {job_counter} deferred migrations");
208
209 for post_migrator in self.post_migrators.iter() {
211 post_migrator.post_migrate_state(store, &mut actors_out)?;
212 }
213
214 for post_migration_check in self.post_migration_checks.iter() {
216 post_migration_check.post_migrate_check(store, &actors_out)?;
217 }
218
219 actors_out.flush()
220 }
221}