Skip to main content

tycho_core/block_strider/starter/
mod.rs

1use std::fs::File;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Result;
7use bytes::Bytes;
8use serde::{Deserialize, Serialize};
9use tycho_types::models::{BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates};
10use tycho_util::config::PartialConfig;
11use tycho_util::fs::MappedFile;
12use tycho_util::serde_helpers;
13
14use self::starter_client::StarterClient;
15use crate::blockchain_rpc::BlockchainRpcClient;
16use crate::global_config::ZerostateId;
17#[cfg(feature = "s3")]
18use crate::s3::S3Client;
19use crate::storage::{CoreStorage, QueueStateReader};
20
21mod cold_boot;
22mod starter_client;
23
24#[derive(Default, Debug, Clone, PartialConfig, Serialize, Deserialize)]
25#[serde(default, deny_unknown_fields)]
26pub struct StarterConfig {
27    /// Choose persistent state which is at least this old.
28    ///
29    /// Default: None
30    #[serde(with = "serde_helpers::humantime")]
31    pub custom_boot_offset: Option<Duration>,
32
33    /// Choose the nearest persistent state strictly before this seqno.
34    ///
35    /// Default: None
36    #[important]
37    pub start_from: Option<u32>,
38}
39
40pub struct StarterBuilder<
41    MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId, StarterConfig),
42> {
43    mandatory_fields: MandatoryFields,
44    optional_fields: BuilderFields,
45}
46
47impl Default for StarterBuilder<((), (), (), ())> {
48    #[inline]
49    fn default() -> Self {
50        Self {
51            mandatory_fields: Default::default(),
52            optional_fields: Default::default(),
53        }
54    }
55}
56
57impl StarterBuilder {
58    pub fn build(self) -> Starter {
59        let (storage, blockchain_rpc_client, zerostate, config) = self.mandatory_fields;
60        let BuilderFields {
61            ignore_states,
62            queue_state_handler,
63            #[cfg(feature = "s3")]
64            s3_client,
65        } = self.optional_fields;
66
67        #[allow(unused_labels)]
68        let starter_client: Arc<dyn StarterClient> = 'client: {
69            #[cfg(feature = "s3")]
70            if let Some(s3_client) = s3_client {
71                use self::starter_client::{HybridStarterClient, S3StarterClient};
72
73                break 'client Arc::new(HybridStarterClient::new(
74                    blockchain_rpc_client.clone(),
75                    S3StarterClient::new(s3_client, storage.clone()),
76                ));
77            }
78
79            Arc::new(blockchain_rpc_client.clone())
80        };
81
82        Starter {
83            inner: Arc::new(StarterInner {
84                ignore_states,
85                storage,
86                starter_client,
87                blockchain_rpc_client,
88                zerostate,
89                config,
90                queue_state_handler: queue_state_handler
91                    .unwrap_or_else(|| Box::new(ValidateQueueState)),
92            }),
93        }
94    }
95}
96
97impl<T2, T3, T4> StarterBuilder<((), T2, T3, T4)> {
98    pub fn with_storage(self, storage: CoreStorage) -> StarterBuilder<(CoreStorage, T2, T3, T4)> {
99        let ((), client, id, config) = self.mandatory_fields;
100        StarterBuilder {
101            mandatory_fields: (storage, client, id, config),
102            optional_fields: self.optional_fields,
103        }
104    }
105}
106
107impl<T1, T3, T4> StarterBuilder<(T1, (), T3, T4)> {
108    pub fn with_blockchain_rpc_client(
109        self,
110        client: BlockchainRpcClient,
111    ) -> StarterBuilder<(T1, BlockchainRpcClient, T3, T4)> {
112        let (storage, (), id, config) = self.mandatory_fields;
113        StarterBuilder {
114            mandatory_fields: (storage, client, id, config),
115            optional_fields: self.optional_fields,
116        }
117    }
118}
119
120impl<T1, T2, T4> StarterBuilder<(T1, T2, (), T4)> {
121    pub fn with_zerostate_id(
122        self,
123        zerostate_id: ZerostateId,
124    ) -> StarterBuilder<(T1, T2, ZerostateId, T4)> {
125        let (storage, client, (), config) = self.mandatory_fields;
126        StarterBuilder {
127            mandatory_fields: (storage, client, zerostate_id, config),
128            optional_fields: self.optional_fields,
129        }
130    }
131}
132
133impl<T1, T2, T3> StarterBuilder<(T1, T2, T3, ())> {
134    pub fn with_config(self, config: StarterConfig) -> StarterBuilder<(T1, T2, T3, StarterConfig)> {
135        let (storage, client, id, ()) = self.mandatory_fields;
136        StarterBuilder {
137            mandatory_fields: (storage, client, id, config),
138            optional_fields: self.optional_fields,
139        }
140    }
141}
142
143impl<T> StarterBuilder<T> {
144    /// Whether to skip downloading persistent states.
145    pub fn ignore_states(mut self, ignore_states: bool) -> Self {
146        self.optional_fields.ignore_states = ignore_states;
147        self
148    }
149
150    pub fn with_queue_state_handler<H: QueueStateHandler>(mut self, handler: H) -> Self {
151        self.optional_fields.queue_state_handler = Some(castaway::match_type!(handler, {
152            Box<dyn QueueStateHandler> as handler => handler,
153            handler => Box::new(handler),
154        }));
155        self
156    }
157
158    #[cfg(feature = "s3")]
159    pub fn with_s3_client(mut self, client: S3Client) -> Self {
160        self.optional_fields.s3_client = Some(client);
161        self
162    }
163}
164
165#[derive(Default)]
166struct BuilderFields {
167    ignore_states: bool,
168    queue_state_handler: Option<Box<dyn QueueStateHandler>>,
169
170    #[cfg(feature = "s3")]
171    s3_client: Option<S3Client>,
172}
173
174/// Bootstrapping utils.
175// TODO: Use it as a block provider?
176#[derive(Clone)]
177#[repr(transparent)]
178pub struct Starter {
179    inner: Arc<StarterInner>,
180}
181
182impl Starter {
183    pub fn builder() -> StarterBuilder<((), (), (), ())> {
184        StarterBuilder::default()
185    }
186
187    pub fn config(&self) -> &StarterConfig {
188        &self.inner.config
189    }
190
191    pub fn queue_state_handler(&self) -> &dyn QueueStateHandler {
192        self.inner.queue_state_handler.as_ref()
193    }
194
195    /// Boot type when the node has not yet started syncing
196    ///
197    /// Returns the last masterchain key block id.
198    pub async fn cold_boot<P>(
199        &self,
200        boot_type: ColdBootType,
201        zerostate_provider: Option<P>,
202    ) -> Result<BlockId>
203    where
204        P: ZerostateProvider,
205    {
206        self.inner.cold_boot(boot_type, zerostate_provider).await
207    }
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
211#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
212pub enum ColdBootType {
213    Genesis,
214    LatestPersistent,
215}
216
217struct StarterInner {
218    ignore_states: bool,
219    storage: CoreStorage,
220    starter_client: Arc<dyn StarterClient>,
221    // TODO: Access blockchain only though the starter client.
222    blockchain_rpc_client: BlockchainRpcClient,
223    zerostate: ZerostateId,
224    config: StarterConfig,
225    queue_state_handler: Box<dyn QueueStateHandler>,
226}
227
228pub trait ZerostateProvider {
229    fn load_zerostates(&self) -> impl Iterator<Item = Result<Bytes>>;
230}
231
232impl ZerostateProvider for () {
233    fn load_zerostates(&self) -> impl Iterator<Item = Result<Bytes>> {
234        std::iter::empty()
235    }
236}
237
238pub struct FileZerostateProvider(pub Vec<PathBuf>);
239
240impl ZerostateProvider for FileZerostateProvider {
241    fn load_zerostates(&self) -> impl Iterator<Item = Result<Bytes>> {
242        self.0.iter().map(load_zerostate)
243    }
244}
245
246fn load_zerostate(path: &PathBuf) -> Result<Bytes> {
247    tracing::info!(path = %path.display(), "loading zerostate");
248    #[allow(clippy::disallowed_methods)]
249    let mf = MappedFile::from_existing_file(File::open(path)?)?;
250    let bytes = Bytes::from_owner(mf);
251    Ok(bytes)
252}
253
254#[async_trait::async_trait]
255pub trait QueueStateHandler: Send + Sync + 'static {
256    async fn import_from_file(
257        &self,
258        top_update: &OutMsgQueueUpdates,
259        file: File,
260        block_id: &BlockId,
261    ) -> Result<()>;
262}
263
264#[async_trait::async_trait]
265impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Arc<T> {
266    async fn import_from_file(
267        &self,
268        top_update: &OutMsgQueueUpdates,
269        file: File,
270        block_id: &BlockId,
271    ) -> Result<()> {
272        T::import_from_file(self, top_update, file, block_id).await
273    }
274}
275
276#[async_trait::async_trait]
277impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Box<T> {
278    async fn import_from_file(
279        &self,
280        top_update: &OutMsgQueueUpdates,
281        file: File,
282        block_id: &BlockId,
283    ) -> Result<()> {
284        T::import_from_file(self, top_update, file, block_id).await
285    }
286}
287
288/// Does some basic validation of the provided queue state.
289#[derive(Debug, Clone, Copy)]
290pub struct ValidateQueueState;
291
292#[async_trait::async_trait]
293impl QueueStateHandler for ValidateQueueState {
294    async fn import_from_file(
295        &self,
296        top_update: &OutMsgQueueUpdates,
297        file: File,
298        block_id: &BlockId,
299    ) -> Result<()> {
300        tracing::info!(%block_id, "validating internal queue state from file");
301
302        let top_update = top_update.clone();
303
304        let span = tracing::Span::current();
305        tokio::task::spawn_blocking(move || {
306            let _span = span.enter();
307
308            let mapped = MappedFile::from_existing_file(file)?;
309
310            let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
311
312            while let Some(mut part) = reader.read_next_queue_diff()? {
313                while let Some(cell) = part.read_next_message()? {
314                    let msg_hash = cell.repr_hash();
315                    let msg = cell.parse::<Message<'_>>()?;
316                    let MsgInfo::Int(int_msg_info) = &msg.info else {
317                        anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
318                    };
319
320                    let IntAddr::Std(_dest) = &int_msg_info.dst else {
321                        anyhow::bail!("non-std destination address in msg {msg_hash}");
322                    };
323
324                    let IntAddr::Std(_src) = &int_msg_info.src else {
325                        anyhow::bail!("non-std destination address in msg {msg_hash}");
326                    };
327                }
328            }
329
330            reader.finish()
331        })
332        .await?
333    }
334}