1use std::fs::File;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Result;
7use bytes::Bytes;
8use serde::{Deserialize, Serialize};
9use tycho_types::models::{BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates};
10use tycho_util::config::PartialConfig;
11use tycho_util::fs::MappedFile;
12use tycho_util::serde_helpers;
13
14use self::starter_client::StarterClient;
15use crate::blockchain_rpc::BlockchainRpcClient;
16use crate::global_config::ZerostateId;
17#[cfg(feature = "s3")]
18use crate::s3::S3Client;
19use crate::storage::{CoreStorage, QueueStateReader};
20
21mod cold_boot;
22mod starter_client;
23
24#[derive(Default, Debug, Clone, PartialConfig, Serialize, Deserialize)]
25#[serde(default, deny_unknown_fields)]
26pub struct StarterConfig {
27 #[serde(with = "serde_helpers::humantime")]
31 pub custom_boot_offset: Option<Duration>,
32
33 #[important]
37 pub start_from: Option<u32>,
38}
39
40pub struct StarterBuilder<
41 MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId, StarterConfig),
42> {
43 mandatory_fields: MandatoryFields,
44 optional_fields: BuilderFields,
45}
46
47impl Default for StarterBuilder<((), (), (), ())> {
48 #[inline]
49 fn default() -> Self {
50 Self {
51 mandatory_fields: Default::default(),
52 optional_fields: Default::default(),
53 }
54 }
55}
56
57impl StarterBuilder {
58 pub fn build(self) -> Starter {
59 let (storage, blockchain_rpc_client, zerostate, config) = self.mandatory_fields;
60 let BuilderFields {
61 ignore_states,
62 queue_state_handler,
63 #[cfg(feature = "s3")]
64 s3_client,
65 } = self.optional_fields;
66
67 #[allow(unused_labels)]
68 let starter_client: Arc<dyn StarterClient> = 'client: {
69 #[cfg(feature = "s3")]
70 if let Some(s3_client) = s3_client {
71 use self::starter_client::{HybridStarterClient, S3StarterClient};
72
73 break 'client Arc::new(HybridStarterClient::new(
74 blockchain_rpc_client.clone(),
75 S3StarterClient::new(s3_client, storage.clone()),
76 ));
77 }
78
79 Arc::new(blockchain_rpc_client.clone())
80 };
81
82 Starter {
83 inner: Arc::new(StarterInner {
84 ignore_states,
85 storage,
86 starter_client,
87 blockchain_rpc_client,
88 zerostate,
89 config,
90 queue_state_handler: queue_state_handler
91 .unwrap_or_else(|| Box::new(ValidateQueueState)),
92 }),
93 }
94 }
95}
96
97impl<T2, T3, T4> StarterBuilder<((), T2, T3, T4)> {
98 pub fn with_storage(self, storage: CoreStorage) -> StarterBuilder<(CoreStorage, T2, T3, T4)> {
99 let ((), client, id, config) = self.mandatory_fields;
100 StarterBuilder {
101 mandatory_fields: (storage, client, id, config),
102 optional_fields: self.optional_fields,
103 }
104 }
105}
106
107impl<T1, T3, T4> StarterBuilder<(T1, (), T3, T4)> {
108 pub fn with_blockchain_rpc_client(
109 self,
110 client: BlockchainRpcClient,
111 ) -> StarterBuilder<(T1, BlockchainRpcClient, T3, T4)> {
112 let (storage, (), id, config) = self.mandatory_fields;
113 StarterBuilder {
114 mandatory_fields: (storage, client, id, config),
115 optional_fields: self.optional_fields,
116 }
117 }
118}
119
120impl<T1, T2, T4> StarterBuilder<(T1, T2, (), T4)> {
121 pub fn with_zerostate_id(
122 self,
123 zerostate_id: ZerostateId,
124 ) -> StarterBuilder<(T1, T2, ZerostateId, T4)> {
125 let (storage, client, (), config) = self.mandatory_fields;
126 StarterBuilder {
127 mandatory_fields: (storage, client, zerostate_id, config),
128 optional_fields: self.optional_fields,
129 }
130 }
131}
132
133impl<T1, T2, T3> StarterBuilder<(T1, T2, T3, ())> {
134 pub fn with_config(self, config: StarterConfig) -> StarterBuilder<(T1, T2, T3, StarterConfig)> {
135 let (storage, client, id, ()) = self.mandatory_fields;
136 StarterBuilder {
137 mandatory_fields: (storage, client, id, config),
138 optional_fields: self.optional_fields,
139 }
140 }
141}
142
143impl<T> StarterBuilder<T> {
144 pub fn ignore_states(mut self, ignore_states: bool) -> Self {
146 self.optional_fields.ignore_states = ignore_states;
147 self
148 }
149
150 pub fn with_queue_state_handler<H: QueueStateHandler>(mut self, handler: H) -> Self {
151 self.optional_fields.queue_state_handler = Some(castaway::match_type!(handler, {
152 Box<dyn QueueStateHandler> as handler => handler,
153 handler => Box::new(handler),
154 }));
155 self
156 }
157
158 #[cfg(feature = "s3")]
159 pub fn with_s3_client(mut self, client: S3Client) -> Self {
160 self.optional_fields.s3_client = Some(client);
161 self
162 }
163}
164
165#[derive(Default)]
166struct BuilderFields {
167 ignore_states: bool,
168 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
169
170 #[cfg(feature = "s3")]
171 s3_client: Option<S3Client>,
172}
173
174#[derive(Clone)]
177#[repr(transparent)]
178pub struct Starter {
179 inner: Arc<StarterInner>,
180}
181
182impl Starter {
183 pub fn builder() -> StarterBuilder<((), (), (), ())> {
184 StarterBuilder::default()
185 }
186
187 pub fn config(&self) -> &StarterConfig {
188 &self.inner.config
189 }
190
191 pub fn queue_state_handler(&self) -> &dyn QueueStateHandler {
192 self.inner.queue_state_handler.as_ref()
193 }
194
195 pub async fn cold_boot<P>(
199 &self,
200 boot_type: ColdBootType,
201 zerostate_provider: Option<P>,
202 ) -> Result<BlockId>
203 where
204 P: ZerostateProvider,
205 {
206 self.inner.cold_boot(boot_type, zerostate_provider).await
207 }
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
211#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
212pub enum ColdBootType {
213 Genesis,
214 LatestPersistent,
215}
216
217struct StarterInner {
218 ignore_states: bool,
219 storage: CoreStorage,
220 starter_client: Arc<dyn StarterClient>,
221 blockchain_rpc_client: BlockchainRpcClient,
223 zerostate: ZerostateId,
224 config: StarterConfig,
225 queue_state_handler: Box<dyn QueueStateHandler>,
226}
227
228pub trait ZerostateProvider {
229 fn load_zerostates(&self) -> impl Iterator<Item = Result<Bytes>>;
230}
231
232impl ZerostateProvider for () {
233 fn load_zerostates(&self) -> impl Iterator<Item = Result<Bytes>> {
234 std::iter::empty()
235 }
236}
237
238pub struct FileZerostateProvider(pub Vec<PathBuf>);
239
240impl ZerostateProvider for FileZerostateProvider {
241 fn load_zerostates(&self) -> impl Iterator<Item = Result<Bytes>> {
242 self.0.iter().map(load_zerostate)
243 }
244}
245
246fn load_zerostate(path: &PathBuf) -> Result<Bytes> {
247 tracing::info!(path = %path.display(), "loading zerostate");
248 #[allow(clippy::disallowed_methods)]
249 let mf = MappedFile::from_existing_file(File::open(path)?)?;
250 let bytes = Bytes::from_owner(mf);
251 Ok(bytes)
252}
253
254#[async_trait::async_trait]
255pub trait QueueStateHandler: Send + Sync + 'static {
256 async fn import_from_file(
257 &self,
258 top_update: &OutMsgQueueUpdates,
259 file: File,
260 block_id: &BlockId,
261 ) -> Result<()>;
262}
263
264#[async_trait::async_trait]
265impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Arc<T> {
266 async fn import_from_file(
267 &self,
268 top_update: &OutMsgQueueUpdates,
269 file: File,
270 block_id: &BlockId,
271 ) -> Result<()> {
272 T::import_from_file(self, top_update, file, block_id).await
273 }
274}
275
276#[async_trait::async_trait]
277impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Box<T> {
278 async fn import_from_file(
279 &self,
280 top_update: &OutMsgQueueUpdates,
281 file: File,
282 block_id: &BlockId,
283 ) -> Result<()> {
284 T::import_from_file(self, top_update, file, block_id).await
285 }
286}
287
288#[derive(Debug, Clone, Copy)]
290pub struct ValidateQueueState;
291
292#[async_trait::async_trait]
293impl QueueStateHandler for ValidateQueueState {
294 async fn import_from_file(
295 &self,
296 top_update: &OutMsgQueueUpdates,
297 file: File,
298 block_id: &BlockId,
299 ) -> Result<()> {
300 tracing::info!(%block_id, "validating internal queue state from file");
301
302 let top_update = top_update.clone();
303
304 let span = tracing::Span::current();
305 tokio::task::spawn_blocking(move || {
306 let _span = span.enter();
307
308 let mapped = MappedFile::from_existing_file(file)?;
309
310 let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
311
312 while let Some(mut part) = reader.read_next_queue_diff()? {
313 while let Some(cell) = part.read_next_message()? {
314 let msg_hash = cell.repr_hash();
315 let msg = cell.parse::<Message<'_>>()?;
316 let MsgInfo::Int(int_msg_info) = &msg.info else {
317 anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
318 };
319
320 let IntAddr::Std(_dest) = &int_msg_info.dst else {
321 anyhow::bail!("non-std destination address in msg {msg_hash}");
322 };
323
324 let IntAddr::Std(_src) = &int_msg_info.src else {
325 anyhow::bail!("non-std destination address in msg {msg_hash}");
326 };
327 }
328 }
329
330 reader.finish()
331 })
332 .await?
333 }
334}