tycho_core/block_strider/starter/
mod.rs

1use std::fs::File;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
9use tycho_storage::fs::MappedFile;
10use tycho_types::boc::Boc;
11use tycho_types::models::{
12    BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardStateUnsplit,
13};
14use tycho_util::serde_helpers;
15
16use crate::blockchain_rpc::BlockchainRpcClient;
17use crate::global_config::ZerostateId;
18use crate::storage::{CoreStorage, QueueStateReader};
19
20mod cold_boot;
21
22#[derive(Default, Debug, Clone, Serialize, Deserialize)]
23#[serde(deny_unknown_fields)]
24pub struct StarterConfig {
25    /// Choose persistent state which is at least this old.
26    ///
27    /// Default: None
28    #[serde(with = "serde_helpers::humantime")]
29    pub custom_boot_offset: Option<Duration>,
30}
31
32pub struct StarterBuilder<
33    MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId, StarterConfig),
34> {
35    mandatory_fields: MandatoryFields,
36    optional_fields: BuilderFields,
37}
38
39impl Default for StarterBuilder<((), (), (), ())> {
40    #[inline]
41    fn default() -> Self {
42        Self {
43            mandatory_fields: Default::default(),
44            optional_fields: Default::default(),
45        }
46    }
47}
48
49impl StarterBuilder {
50    pub fn build(self) -> Starter {
51        let (storage, blockchain_rpc_client, zerostate, config) = self.mandatory_fields;
52        let BuilderFields {
53            queue_state_handler,
54        } = self.optional_fields;
55
56        Starter {
57            inner: Arc::new(StarterInner {
58                storage,
59                blockchain_rpc_client,
60                zerostate,
61                config,
62                queue_state_handler: queue_state_handler
63                    .unwrap_or_else(|| Box::new(ValidateQueueState)),
64            }),
65        }
66    }
67}
68
69impl<T2, T3, T4> StarterBuilder<((), T2, T3, T4)> {
70    // TODO: Use `CoreStorage`.
71    pub fn with_storage(self, storage: CoreStorage) -> StarterBuilder<(CoreStorage, T2, T3, T4)> {
72        let ((), client, id, config) = self.mandatory_fields;
73        StarterBuilder {
74            mandatory_fields: (storage, client, id, config),
75            optional_fields: self.optional_fields,
76        }
77    }
78}
79
80impl<T1, T3, T4> StarterBuilder<(T1, (), T3, T4)> {
81    pub fn with_blockchain_rpc_client(
82        self,
83        client: BlockchainRpcClient,
84    ) -> StarterBuilder<(T1, BlockchainRpcClient, T3, T4)> {
85        let (storage, (), id, config) = self.mandatory_fields;
86        StarterBuilder {
87            mandatory_fields: (storage, client, id, config),
88            optional_fields: self.optional_fields,
89        }
90    }
91}
92
93impl<T1, T2, T4> StarterBuilder<(T1, T2, (), T4)> {
94    pub fn with_zerostate_id(
95        self,
96        zerostate_id: ZerostateId,
97    ) -> StarterBuilder<(T1, T2, ZerostateId, T4)> {
98        let (storage, client, (), config) = self.mandatory_fields;
99        StarterBuilder {
100            mandatory_fields: (storage, client, zerostate_id, config),
101            optional_fields: self.optional_fields,
102        }
103    }
104}
105
106impl<T1, T2, T3> StarterBuilder<(T1, T2, T3, ())> {
107    pub fn with_config(self, config: StarterConfig) -> StarterBuilder<(T1, T2, T3, StarterConfig)> {
108        let (storage, client, id, ()) = self.mandatory_fields;
109        StarterBuilder {
110            mandatory_fields: (storage, client, id, config),
111            optional_fields: self.optional_fields,
112        }
113    }
114}
115
116impl<T> StarterBuilder<T> {
117    pub fn with_queue_state_handler<H: QueueStateHandler>(mut self, handler: H) -> Self {
118        self.optional_fields.queue_state_handler = Some(castaway::match_type!(handler, {
119            Box<dyn QueueStateHandler> as handler => handler,
120            handler => Box::new(handler),
121        }));
122        self
123    }
124}
125
126#[derive(Default)]
127struct BuilderFields {
128    queue_state_handler: Option<Box<dyn QueueStateHandler>>,
129}
130
131/// Bootstrapping utils.
132// TODO: Use it as a block provider?
133#[derive(Clone)]
134#[repr(transparent)]
135pub struct Starter {
136    inner: Arc<StarterInner>,
137}
138
139impl Starter {
140    pub fn builder() -> StarterBuilder<((), (), (), ())> {
141        StarterBuilder::default()
142    }
143
144    pub fn config(&self) -> &StarterConfig {
145        &self.inner.config
146    }
147
148    pub fn queue_state_handler(&self) -> &dyn QueueStateHandler {
149        self.inner.queue_state_handler.as_ref()
150    }
151
152    /// Boot type when the node has not yet started syncing
153    ///
154    /// Returns the last masterchain key block id.
155    pub async fn cold_boot<P>(
156        &self,
157        boot_type: ColdBootType,
158        zerostate_provider: Option<P>,
159    ) -> Result<BlockId>
160    where
161        P: ZerostateProvider,
162    {
163        self.inner.cold_boot(boot_type, zerostate_provider).await
164    }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
168#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
169pub enum ColdBootType {
170    Genesis,
171    LatestPersistent,
172}
173
174struct StarterInner {
175    storage: CoreStorage,
176    blockchain_rpc_client: BlockchainRpcClient,
177    zerostate: ZerostateId,
178    config: StarterConfig,
179    queue_state_handler: Box<dyn QueueStateHandler>,
180}
181
182pub trait ZerostateProvider {
183    fn load_zerostates(
184        &self,
185        tracker: &MinRefMcStateTracker,
186    ) -> impl Iterator<Item = Result<ShardStateStuff>>;
187}
188
189impl ZerostateProvider for () {
190    fn load_zerostates(
191        &self,
192        _: &MinRefMcStateTracker,
193    ) -> impl Iterator<Item = Result<ShardStateStuff>> {
194        std::iter::empty()
195    }
196}
197
198pub struct FileZerostateProvider(pub Vec<PathBuf>);
199
200impl ZerostateProvider for FileZerostateProvider {
201    fn load_zerostates(
202        &self,
203        tracker: &MinRefMcStateTracker,
204    ) -> impl Iterator<Item = Result<ShardStateStuff>> {
205        self.0.iter().map(move |path| load_zerostate(tracker, path))
206    }
207}
208
209fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result<ShardStateStuff> {
210    let data = std::fs::read(path).context("failed to read file")?;
211    let file_hash = Boc::file_hash_blake(&data);
212
213    let root = Boc::decode(data).context("failed to decode BOC")?;
214    let root_hash = *root.repr_hash();
215
216    let state = root
217        .parse::<ShardStateUnsplit>()
218        .context("failed to parse state")?;
219
220    anyhow::ensure!(state.seqno == 0, "not a zerostate");
221
222    let block_id = BlockId {
223        shard: state.shard_ident,
224        seqno: state.seqno,
225        root_hash,
226        file_hash,
227    };
228
229    ShardStateStuff::from_root(&block_id, root, tracker.insert_untracked())
230}
231
232#[async_trait::async_trait]
233pub trait QueueStateHandler: Send + Sync + 'static {
234    async fn import_from_file(
235        &self,
236        top_update: &OutMsgQueueUpdates,
237        file: File,
238        block_id: &BlockId,
239    ) -> Result<()>;
240}
241
242#[async_trait::async_trait]
243impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Arc<T> {
244    async fn import_from_file(
245        &self,
246        top_update: &OutMsgQueueUpdates,
247        file: File,
248        block_id: &BlockId,
249    ) -> Result<()> {
250        T::import_from_file(self, top_update, file, block_id).await
251    }
252}
253
254#[async_trait::async_trait]
255impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Box<T> {
256    async fn import_from_file(
257        &self,
258        top_update: &OutMsgQueueUpdates,
259        file: File,
260        block_id: &BlockId,
261    ) -> Result<()> {
262        T::import_from_file(self, top_update, file, block_id).await
263    }
264}
265
266/// Does some basic validation of the provided queue state.
267#[derive(Debug, Clone, Copy)]
268pub struct ValidateQueueState;
269
270#[async_trait::async_trait]
271impl QueueStateHandler for ValidateQueueState {
272    async fn import_from_file(
273        &self,
274        top_update: &OutMsgQueueUpdates,
275        file: File,
276        block_id: &BlockId,
277    ) -> Result<()> {
278        tracing::info!(%block_id, "validating internal queue state from file");
279
280        let top_update = top_update.clone();
281
282        let span = tracing::Span::current();
283        tokio::task::spawn_blocking(move || {
284            let _span = span.enter();
285
286            let mapped = MappedFile::from_existing_file(file)?;
287
288            let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
289
290            while let Some(mut part) = reader.read_next_queue_diff()? {
291                while let Some(cell) = part.read_next_message()? {
292                    let msg_hash = cell.repr_hash();
293                    let msg = cell.parse::<Message<'_>>()?;
294                    let MsgInfo::Int(int_msg_info) = &msg.info else {
295                        anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
296                    };
297
298                    let IntAddr::Std(_dest) = &int_msg_info.dst else {
299                        anyhow::bail!("non-std destination address in msg {msg_hash}");
300                    };
301
302                    let IntAddr::Std(_src) = &int_msg_info.src else {
303                        anyhow::bail!("non-std destination address in msg {msg_hash}");
304                    };
305                }
306            }
307
308            reader.finish()
309        })
310        .await?
311    }
312}