1use std::fs::File;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
9use tycho_types::boc::Boc;
10use tycho_types::models::{
11 BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardStateUnsplit,
12};
13use tycho_util::config::PartialConfig;
14use tycho_util::fs::MappedFile;
15use tycho_util::serde_helpers;
16
17use self::starter_client::StarterClient;
18use crate::blockchain_rpc::BlockchainRpcClient;
19use crate::global_config::ZerostateId;
20#[cfg(feature = "s3")]
21use crate::s3::S3Client;
22use crate::storage::{CoreStorage, QueueStateReader};
23
24mod cold_boot;
25mod starter_client;
26
27#[derive(Default, Debug, Clone, PartialConfig, Serialize, Deserialize)]
28#[serde(default, deny_unknown_fields)]
29pub struct StarterConfig {
30 #[serde(with = "serde_helpers::humantime")]
34 pub custom_boot_offset: Option<Duration>,
35
36 #[important]
40 pub start_from: Option<u32>,
41}
42
43pub struct StarterBuilder<
44 MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId, StarterConfig),
45> {
46 mandatory_fields: MandatoryFields,
47 optional_fields: BuilderFields,
48}
49
50impl Default for StarterBuilder<((), (), (), ())> {
51 #[inline]
52 fn default() -> Self {
53 Self {
54 mandatory_fields: Default::default(),
55 optional_fields: Default::default(),
56 }
57 }
58}
59
60impl StarterBuilder {
61 pub fn build(self) -> Starter {
62 let (storage, blockchain_rpc_client, zerostate, config) = self.mandatory_fields;
63 let BuilderFields {
64 ignore_states,
65 queue_state_handler,
66 #[cfg(feature = "s3")]
67 s3_client,
68 } = self.optional_fields;
69
70 #[allow(unused_labels)]
71 let starter_client: Arc<dyn StarterClient> = 'client: {
72 #[cfg(feature = "s3")]
73 if let Some(s3_client) = s3_client {
74 use self::starter_client::{HybridStarterClient, S3StarterClient};
75
76 break 'client Arc::new(HybridStarterClient::new(
77 blockchain_rpc_client.clone(),
78 S3StarterClient::new(s3_client, storage.clone()),
79 ));
80 }
81
82 Arc::new(blockchain_rpc_client.clone())
83 };
84
85 Starter {
86 inner: Arc::new(StarterInner {
87 ignore_states,
88 storage,
89 starter_client,
90 blockchain_rpc_client,
91 zerostate,
92 config,
93 queue_state_handler: queue_state_handler
94 .unwrap_or_else(|| Box::new(ValidateQueueState)),
95 }),
96 }
97 }
98}
99
100impl<T2, T3, T4> StarterBuilder<((), T2, T3, T4)> {
101 pub fn with_storage(self, storage: CoreStorage) -> StarterBuilder<(CoreStorage, T2, T3, T4)> {
102 let ((), client, id, config) = self.mandatory_fields;
103 StarterBuilder {
104 mandatory_fields: (storage, client, id, config),
105 optional_fields: self.optional_fields,
106 }
107 }
108}
109
110impl<T1, T3, T4> StarterBuilder<(T1, (), T3, T4)> {
111 pub fn with_blockchain_rpc_client(
112 self,
113 client: BlockchainRpcClient,
114 ) -> StarterBuilder<(T1, BlockchainRpcClient, T3, T4)> {
115 let (storage, (), id, config) = self.mandatory_fields;
116 StarterBuilder {
117 mandatory_fields: (storage, client, id, config),
118 optional_fields: self.optional_fields,
119 }
120 }
121}
122
123impl<T1, T2, T4> StarterBuilder<(T1, T2, (), T4)> {
124 pub fn with_zerostate_id(
125 self,
126 zerostate_id: ZerostateId,
127 ) -> StarterBuilder<(T1, T2, ZerostateId, T4)> {
128 let (storage, client, (), config) = self.mandatory_fields;
129 StarterBuilder {
130 mandatory_fields: (storage, client, zerostate_id, config),
131 optional_fields: self.optional_fields,
132 }
133 }
134}
135
136impl<T1, T2, T3> StarterBuilder<(T1, T2, T3, ())> {
137 pub fn with_config(self, config: StarterConfig) -> StarterBuilder<(T1, T2, T3, StarterConfig)> {
138 let (storage, client, id, ()) = self.mandatory_fields;
139 StarterBuilder {
140 mandatory_fields: (storage, client, id, config),
141 optional_fields: self.optional_fields,
142 }
143 }
144}
145
146impl<T> StarterBuilder<T> {
147 pub fn ignore_states(mut self, ignore_states: bool) -> Self {
149 self.optional_fields.ignore_states = ignore_states;
150 self
151 }
152
153 pub fn with_queue_state_handler<H: QueueStateHandler>(mut self, handler: H) -> Self {
154 self.optional_fields.queue_state_handler = Some(castaway::match_type!(handler, {
155 Box<dyn QueueStateHandler> as handler => handler,
156 handler => Box::new(handler),
157 }));
158 self
159 }
160
161 #[cfg(feature = "s3")]
162 pub fn with_s3_client(mut self, client: S3Client) -> Self {
163 self.optional_fields.s3_client = Some(client);
164 self
165 }
166}
167
168#[derive(Default)]
169struct BuilderFields {
170 ignore_states: bool,
171 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
172
173 #[cfg(feature = "s3")]
174 s3_client: Option<S3Client>,
175}
176
177#[derive(Clone)]
180#[repr(transparent)]
181pub struct Starter {
182 inner: Arc<StarterInner>,
183}
184
185impl Starter {
186 pub fn builder() -> StarterBuilder<((), (), (), ())> {
187 StarterBuilder::default()
188 }
189
190 pub fn config(&self) -> &StarterConfig {
191 &self.inner.config
192 }
193
194 pub fn queue_state_handler(&self) -> &dyn QueueStateHandler {
195 self.inner.queue_state_handler.as_ref()
196 }
197
198 pub async fn cold_boot<P>(
202 &self,
203 boot_type: ColdBootType,
204 zerostate_provider: Option<P>,
205 ) -> Result<BlockId>
206 where
207 P: ZerostateProvider,
208 {
209 self.inner.cold_boot(boot_type, zerostate_provider).await
210 }
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
214#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
215pub enum ColdBootType {
216 Genesis,
217 LatestPersistent,
218}
219
220struct StarterInner {
221 ignore_states: bool,
222 storage: CoreStorage,
223 starter_client: Arc<dyn StarterClient>,
224 blockchain_rpc_client: BlockchainRpcClient,
226 zerostate: ZerostateId,
227 config: StarterConfig,
228 queue_state_handler: Box<dyn QueueStateHandler>,
229}
230
231pub trait ZerostateProvider {
232 fn load_zerostates(
233 &self,
234 tracker: &MinRefMcStateTracker,
235 ) -> impl Iterator<Item = Result<ShardStateStuff>>;
236}
237
238impl ZerostateProvider for () {
239 fn load_zerostates(
240 &self,
241 _: &MinRefMcStateTracker,
242 ) -> impl Iterator<Item = Result<ShardStateStuff>> {
243 std::iter::empty()
244 }
245}
246
247pub struct FileZerostateProvider(pub Vec<PathBuf>);
248
249impl ZerostateProvider for FileZerostateProvider {
250 fn load_zerostates(
251 &self,
252 tracker: &MinRefMcStateTracker,
253 ) -> impl Iterator<Item = Result<ShardStateStuff>> {
254 self.0.iter().map(move |path| load_zerostate(tracker, path))
255 }
256}
257
258fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result<ShardStateStuff> {
259 let data = std::fs::read(path).context("failed to read file")?;
260 let file_hash = Boc::file_hash_blake(&data);
261
262 let root = Boc::decode(data).context("failed to decode BOC")?;
263 let root_hash = *root.repr_hash();
264
265 let state = root
266 .parse::<ShardStateUnsplit>()
267 .context("failed to parse state")?;
268
269 anyhow::ensure!(state.seqno == 0, "not a zerostate");
270
271 let block_id = BlockId {
272 shard: state.shard_ident,
273 seqno: state.seqno,
274 root_hash,
275 file_hash,
276 };
277
278 ShardStateStuff::from_root(&block_id, root, tracker.insert_untracked())
279}
280
281#[async_trait::async_trait]
282pub trait QueueStateHandler: Send + Sync + 'static {
283 async fn import_from_file(
284 &self,
285 top_update: &OutMsgQueueUpdates,
286 file: File,
287 block_id: &BlockId,
288 ) -> Result<()>;
289}
290
291#[async_trait::async_trait]
292impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Arc<T> {
293 async fn import_from_file(
294 &self,
295 top_update: &OutMsgQueueUpdates,
296 file: File,
297 block_id: &BlockId,
298 ) -> Result<()> {
299 T::import_from_file(self, top_update, file, block_id).await
300 }
301}
302
303#[async_trait::async_trait]
304impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Box<T> {
305 async fn import_from_file(
306 &self,
307 top_update: &OutMsgQueueUpdates,
308 file: File,
309 block_id: &BlockId,
310 ) -> Result<()> {
311 T::import_from_file(self, top_update, file, block_id).await
312 }
313}
314
315#[derive(Debug, Clone, Copy)]
317pub struct ValidateQueueState;
318
319#[async_trait::async_trait]
320impl QueueStateHandler for ValidateQueueState {
321 async fn import_from_file(
322 &self,
323 top_update: &OutMsgQueueUpdates,
324 file: File,
325 block_id: &BlockId,
326 ) -> Result<()> {
327 tracing::info!(%block_id, "validating internal queue state from file");
328
329 let top_update = top_update.clone();
330
331 let span = tracing::Span::current();
332 tokio::task::spawn_blocking(move || {
333 let _span = span.enter();
334
335 let mapped = MappedFile::from_existing_file(file)?;
336
337 let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
338
339 while let Some(mut part) = reader.read_next_queue_diff()? {
340 while let Some(cell) = part.read_next_message()? {
341 let msg_hash = cell.repr_hash();
342 let msg = cell.parse::<Message<'_>>()?;
343 let MsgInfo::Int(int_msg_info) = &msg.info else {
344 anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
345 };
346
347 let IntAddr::Std(_dest) = &int_msg_info.dst else {
348 anyhow::bail!("non-std destination address in msg {msg_hash}");
349 };
350
351 let IntAddr::Std(_src) = &int_msg_info.src else {
352 anyhow::bail!("non-std destination address in msg {msg_hash}");
353 };
354 }
355 }
356
357 reader.finish()
358 })
359 .await?
360 }
361}