tycho_core/block_strider/starter/
mod.rs

1use std::fs::File;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
9use tycho_types::boc::Boc;
10use tycho_types::models::{
11    BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardStateUnsplit,
12};
13use tycho_util::config::PartialConfig;
14use tycho_util::fs::MappedFile;
15use tycho_util::serde_helpers;
16
17use self::starter_client::StarterClient;
18use crate::blockchain_rpc::BlockchainRpcClient;
19use crate::global_config::ZerostateId;
20#[cfg(feature = "s3")]
21use crate::s3::S3Client;
22use crate::storage::{CoreStorage, QueueStateReader};
23
24mod cold_boot;
25mod starter_client;
26
27#[derive(Default, Debug, Clone, PartialConfig, Serialize, Deserialize)]
28#[serde(default, deny_unknown_fields)]
29pub struct StarterConfig {
30    /// Choose persistent state which is at least this old.
31    ///
32    /// Default: None
33    #[serde(with = "serde_helpers::humantime")]
34    pub custom_boot_offset: Option<Duration>,
35
36    /// Choose the nearest persistent state strictly before this seqno.
37    ///
38    /// Default: None
39    #[important]
40    pub start_from: Option<u32>,
41}
42
43pub struct StarterBuilder<
44    MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId, StarterConfig),
45> {
46    mandatory_fields: MandatoryFields,
47    optional_fields: BuilderFields,
48}
49
50impl Default for StarterBuilder<((), (), (), ())> {
51    #[inline]
52    fn default() -> Self {
53        Self {
54            mandatory_fields: Default::default(),
55            optional_fields: Default::default(),
56        }
57    }
58}
59
60impl StarterBuilder {
61    pub fn build(self) -> Starter {
62        let (storage, blockchain_rpc_client, zerostate, config) = self.mandatory_fields;
63        let BuilderFields {
64            ignore_states,
65            queue_state_handler,
66            #[cfg(feature = "s3")]
67            s3_client,
68        } = self.optional_fields;
69
70        #[allow(unused_labels)]
71        let starter_client: Arc<dyn StarterClient> = 'client: {
72            #[cfg(feature = "s3")]
73            if let Some(s3_client) = s3_client {
74                use self::starter_client::{HybridStarterClient, S3StarterClient};
75
76                break 'client Arc::new(HybridStarterClient::new(
77                    blockchain_rpc_client.clone(),
78                    S3StarterClient::new(s3_client, storage.clone()),
79                ));
80            }
81
82            Arc::new(blockchain_rpc_client.clone())
83        };
84
85        Starter {
86            inner: Arc::new(StarterInner {
87                ignore_states,
88                storage,
89                starter_client,
90                blockchain_rpc_client,
91                zerostate,
92                config,
93                queue_state_handler: queue_state_handler
94                    .unwrap_or_else(|| Box::new(ValidateQueueState)),
95            }),
96        }
97    }
98}
99
100impl<T2, T3, T4> StarterBuilder<((), T2, T3, T4)> {
101    pub fn with_storage(self, storage: CoreStorage) -> StarterBuilder<(CoreStorage, T2, T3, T4)> {
102        let ((), client, id, config) = self.mandatory_fields;
103        StarterBuilder {
104            mandatory_fields: (storage, client, id, config),
105            optional_fields: self.optional_fields,
106        }
107    }
108}
109
110impl<T1, T3, T4> StarterBuilder<(T1, (), T3, T4)> {
111    pub fn with_blockchain_rpc_client(
112        self,
113        client: BlockchainRpcClient,
114    ) -> StarterBuilder<(T1, BlockchainRpcClient, T3, T4)> {
115        let (storage, (), id, config) = self.mandatory_fields;
116        StarterBuilder {
117            mandatory_fields: (storage, client, id, config),
118            optional_fields: self.optional_fields,
119        }
120    }
121}
122
123impl<T1, T2, T4> StarterBuilder<(T1, T2, (), T4)> {
124    pub fn with_zerostate_id(
125        self,
126        zerostate_id: ZerostateId,
127    ) -> StarterBuilder<(T1, T2, ZerostateId, T4)> {
128        let (storage, client, (), config) = self.mandatory_fields;
129        StarterBuilder {
130            mandatory_fields: (storage, client, zerostate_id, config),
131            optional_fields: self.optional_fields,
132        }
133    }
134}
135
136impl<T1, T2, T3> StarterBuilder<(T1, T2, T3, ())> {
137    pub fn with_config(self, config: StarterConfig) -> StarterBuilder<(T1, T2, T3, StarterConfig)> {
138        let (storage, client, id, ()) = self.mandatory_fields;
139        StarterBuilder {
140            mandatory_fields: (storage, client, id, config),
141            optional_fields: self.optional_fields,
142        }
143    }
144}
145
146impl<T> StarterBuilder<T> {
147    /// Whether to skip downloading persistent states.
148    pub fn ignore_states(mut self, ignore_states: bool) -> Self {
149        self.optional_fields.ignore_states = ignore_states;
150        self
151    }
152
153    pub fn with_queue_state_handler<H: QueueStateHandler>(mut self, handler: H) -> Self {
154        self.optional_fields.queue_state_handler = Some(castaway::match_type!(handler, {
155            Box<dyn QueueStateHandler> as handler => handler,
156            handler => Box::new(handler),
157        }));
158        self
159    }
160
161    #[cfg(feature = "s3")]
162    pub fn with_s3_client(mut self, client: S3Client) -> Self {
163        self.optional_fields.s3_client = Some(client);
164        self
165    }
166}
167
168#[derive(Default)]
169struct BuilderFields {
170    ignore_states: bool,
171    queue_state_handler: Option<Box<dyn QueueStateHandler>>,
172
173    #[cfg(feature = "s3")]
174    s3_client: Option<S3Client>,
175}
176
177/// Bootstrapping utils.
178// TODO: Use it as a block provider?
179#[derive(Clone)]
180#[repr(transparent)]
181pub struct Starter {
182    inner: Arc<StarterInner>,
183}
184
185impl Starter {
186    pub fn builder() -> StarterBuilder<((), (), (), ())> {
187        StarterBuilder::default()
188    }
189
190    pub fn config(&self) -> &StarterConfig {
191        &self.inner.config
192    }
193
194    pub fn queue_state_handler(&self) -> &dyn QueueStateHandler {
195        self.inner.queue_state_handler.as_ref()
196    }
197
198    /// Boot type when the node has not yet started syncing
199    ///
200    /// Returns the last masterchain key block id.
201    pub async fn cold_boot<P>(
202        &self,
203        boot_type: ColdBootType,
204        zerostate_provider: Option<P>,
205    ) -> Result<BlockId>
206    where
207        P: ZerostateProvider,
208    {
209        self.inner.cold_boot(boot_type, zerostate_provider).await
210    }
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
214#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
215pub enum ColdBootType {
216    Genesis,
217    LatestPersistent,
218}
219
220struct StarterInner {
221    ignore_states: bool,
222    storage: CoreStorage,
223    starter_client: Arc<dyn StarterClient>,
224    // TODO: Access blockchain only though the starter client.
225    blockchain_rpc_client: BlockchainRpcClient,
226    zerostate: ZerostateId,
227    config: StarterConfig,
228    queue_state_handler: Box<dyn QueueStateHandler>,
229}
230
231pub trait ZerostateProvider {
232    fn load_zerostates(
233        &self,
234        tracker: &MinRefMcStateTracker,
235    ) -> impl Iterator<Item = Result<ShardStateStuff>>;
236}
237
238impl ZerostateProvider for () {
239    fn load_zerostates(
240        &self,
241        _: &MinRefMcStateTracker,
242    ) -> impl Iterator<Item = Result<ShardStateStuff>> {
243        std::iter::empty()
244    }
245}
246
247pub struct FileZerostateProvider(pub Vec<PathBuf>);
248
249impl ZerostateProvider for FileZerostateProvider {
250    fn load_zerostates(
251        &self,
252        tracker: &MinRefMcStateTracker,
253    ) -> impl Iterator<Item = Result<ShardStateStuff>> {
254        self.0.iter().map(move |path| load_zerostate(tracker, path))
255    }
256}
257
258fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result<ShardStateStuff> {
259    let data = std::fs::read(path).context("failed to read file")?;
260    let file_hash = Boc::file_hash_blake(&data);
261
262    let root = Boc::decode(data).context("failed to decode BOC")?;
263    let root_hash = *root.repr_hash();
264
265    let state = root
266        .parse::<ShardStateUnsplit>()
267        .context("failed to parse state")?;
268
269    anyhow::ensure!(state.seqno == 0, "not a zerostate");
270
271    let block_id = BlockId {
272        shard: state.shard_ident,
273        seqno: state.seqno,
274        root_hash,
275        file_hash,
276    };
277
278    ShardStateStuff::from_root(&block_id, root, tracker.insert_untracked())
279}
280
281#[async_trait::async_trait]
282pub trait QueueStateHandler: Send + Sync + 'static {
283    async fn import_from_file(
284        &self,
285        top_update: &OutMsgQueueUpdates,
286        file: File,
287        block_id: &BlockId,
288    ) -> Result<()>;
289}
290
291#[async_trait::async_trait]
292impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Arc<T> {
293    async fn import_from_file(
294        &self,
295        top_update: &OutMsgQueueUpdates,
296        file: File,
297        block_id: &BlockId,
298    ) -> Result<()> {
299        T::import_from_file(self, top_update, file, block_id).await
300    }
301}
302
303#[async_trait::async_trait]
304impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Box<T> {
305    async fn import_from_file(
306        &self,
307        top_update: &OutMsgQueueUpdates,
308        file: File,
309        block_id: &BlockId,
310    ) -> Result<()> {
311        T::import_from_file(self, top_update, file, block_id).await
312    }
313}
314
315/// Does some basic validation of the provided queue state.
316#[derive(Debug, Clone, Copy)]
317pub struct ValidateQueueState;
318
319#[async_trait::async_trait]
320impl QueueStateHandler for ValidateQueueState {
321    async fn import_from_file(
322        &self,
323        top_update: &OutMsgQueueUpdates,
324        file: File,
325        block_id: &BlockId,
326    ) -> Result<()> {
327        tracing::info!(%block_id, "validating internal queue state from file");
328
329        let top_update = top_update.clone();
330
331        let span = tracing::Span::current();
332        tokio::task::spawn_blocking(move || {
333            let _span = span.enter();
334
335            let mapped = MappedFile::from_existing_file(file)?;
336
337            let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
338
339            while let Some(mut part) = reader.read_next_queue_diff()? {
340                while let Some(cell) = part.read_next_message()? {
341                    let msg_hash = cell.repr_hash();
342                    let msg = cell.parse::<Message<'_>>()?;
343                    let MsgInfo::Int(int_msg_info) = &msg.info else {
344                        anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
345                    };
346
347                    let IntAddr::Std(_dest) = &int_msg_info.dst else {
348                        anyhow::bail!("non-std destination address in msg {msg_hash}");
349                    };
350
351                    let IntAddr::Std(_src) = &int_msg_info.src else {
352                        anyhow::bail!("non-std destination address in msg {msg_hash}");
353                    };
354                }
355            }
356
357            reader.finish()
358        })
359        .await?
360    }
361}