1use std::fs::File;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
9use tycho_storage::fs::MappedFile;
10use tycho_types::boc::Boc;
11use tycho_types::models::{
12 BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardStateUnsplit,
13};
14use tycho_util::serde_helpers;
15
16use crate::blockchain_rpc::BlockchainRpcClient;
17use crate::global_config::ZerostateId;
18use crate::storage::{CoreStorage, QueueStateReader};
19
20mod cold_boot;
21
22#[derive(Default, Debug, Clone, Serialize, Deserialize)]
23#[serde(deny_unknown_fields)]
24pub struct StarterConfig {
25 #[serde(with = "serde_helpers::humantime")]
29 pub custom_boot_offset: Option<Duration>,
30}
31
32pub struct StarterBuilder<
33 MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId, StarterConfig),
34> {
35 mandatory_fields: MandatoryFields,
36 optional_fields: BuilderFields,
37}
38
39impl Default for StarterBuilder<((), (), (), ())> {
40 #[inline]
41 fn default() -> Self {
42 Self {
43 mandatory_fields: Default::default(),
44 optional_fields: Default::default(),
45 }
46 }
47}
48
49impl StarterBuilder {
50 pub fn build(self) -> Starter {
51 let (storage, blockchain_rpc_client, zerostate, config) = self.mandatory_fields;
52 let BuilderFields {
53 queue_state_handler,
54 } = self.optional_fields;
55
56 Starter {
57 inner: Arc::new(StarterInner {
58 storage,
59 blockchain_rpc_client,
60 zerostate,
61 config,
62 queue_state_handler: queue_state_handler
63 .unwrap_or_else(|| Box::new(ValidateQueueState)),
64 }),
65 }
66 }
67}
68
69impl<T2, T3, T4> StarterBuilder<((), T2, T3, T4)> {
70 pub fn with_storage(self, storage: CoreStorage) -> StarterBuilder<(CoreStorage, T2, T3, T4)> {
72 let ((), client, id, config) = self.mandatory_fields;
73 StarterBuilder {
74 mandatory_fields: (storage, client, id, config),
75 optional_fields: self.optional_fields,
76 }
77 }
78}
79
80impl<T1, T3, T4> StarterBuilder<(T1, (), T3, T4)> {
81 pub fn with_blockchain_rpc_client(
82 self,
83 client: BlockchainRpcClient,
84 ) -> StarterBuilder<(T1, BlockchainRpcClient, T3, T4)> {
85 let (storage, (), id, config) = self.mandatory_fields;
86 StarterBuilder {
87 mandatory_fields: (storage, client, id, config),
88 optional_fields: self.optional_fields,
89 }
90 }
91}
92
93impl<T1, T2, T4> StarterBuilder<(T1, T2, (), T4)> {
94 pub fn with_zerostate_id(
95 self,
96 zerostate_id: ZerostateId,
97 ) -> StarterBuilder<(T1, T2, ZerostateId, T4)> {
98 let (storage, client, (), config) = self.mandatory_fields;
99 StarterBuilder {
100 mandatory_fields: (storage, client, zerostate_id, config),
101 optional_fields: self.optional_fields,
102 }
103 }
104}
105
106impl<T1, T2, T3> StarterBuilder<(T1, T2, T3, ())> {
107 pub fn with_config(self, config: StarterConfig) -> StarterBuilder<(T1, T2, T3, StarterConfig)> {
108 let (storage, client, id, ()) = self.mandatory_fields;
109 StarterBuilder {
110 mandatory_fields: (storage, client, id, config),
111 optional_fields: self.optional_fields,
112 }
113 }
114}
115
116impl<T> StarterBuilder<T> {
117 pub fn with_queue_state_handler<H: QueueStateHandler>(mut self, handler: H) -> Self {
118 self.optional_fields.queue_state_handler = Some(castaway::match_type!(handler, {
119 Box<dyn QueueStateHandler> as handler => handler,
120 handler => Box::new(handler),
121 }));
122 self
123 }
124}
125
126#[derive(Default)]
127struct BuilderFields {
128 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
129}
130
131#[derive(Clone)]
134#[repr(transparent)]
135pub struct Starter {
136 inner: Arc<StarterInner>,
137}
138
139impl Starter {
140 pub fn builder() -> StarterBuilder<((), (), (), ())> {
141 StarterBuilder::default()
142 }
143
144 pub fn config(&self) -> &StarterConfig {
145 &self.inner.config
146 }
147
148 pub fn queue_state_handler(&self) -> &dyn QueueStateHandler {
149 self.inner.queue_state_handler.as_ref()
150 }
151
152 pub async fn cold_boot<P>(
156 &self,
157 boot_type: ColdBootType,
158 zerostate_provider: Option<P>,
159 ) -> Result<BlockId>
160 where
161 P: ZerostateProvider,
162 {
163 self.inner.cold_boot(boot_type, zerostate_provider).await
164 }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
168#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
169pub enum ColdBootType {
170 Genesis,
171 LatestPersistent,
172}
173
174struct StarterInner {
175 storage: CoreStorage,
176 blockchain_rpc_client: BlockchainRpcClient,
177 zerostate: ZerostateId,
178 config: StarterConfig,
179 queue_state_handler: Box<dyn QueueStateHandler>,
180}
181
182pub trait ZerostateProvider {
183 fn load_zerostates(
184 &self,
185 tracker: &MinRefMcStateTracker,
186 ) -> impl Iterator<Item = Result<ShardStateStuff>>;
187}
188
189impl ZerostateProvider for () {
190 fn load_zerostates(
191 &self,
192 _: &MinRefMcStateTracker,
193 ) -> impl Iterator<Item = Result<ShardStateStuff>> {
194 std::iter::empty()
195 }
196}
197
198pub struct FileZerostateProvider(pub Vec<PathBuf>);
199
200impl ZerostateProvider for FileZerostateProvider {
201 fn load_zerostates(
202 &self,
203 tracker: &MinRefMcStateTracker,
204 ) -> impl Iterator<Item = Result<ShardStateStuff>> {
205 self.0.iter().map(move |path| load_zerostate(tracker, path))
206 }
207}
208
209fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result<ShardStateStuff> {
210 let data = std::fs::read(path).context("failed to read file")?;
211 let file_hash = Boc::file_hash_blake(&data);
212
213 let root = Boc::decode(data).context("failed to decode BOC")?;
214 let root_hash = *root.repr_hash();
215
216 let state = root
217 .parse::<ShardStateUnsplit>()
218 .context("failed to parse state")?;
219
220 anyhow::ensure!(state.seqno == 0, "not a zerostate");
221
222 let block_id = BlockId {
223 shard: state.shard_ident,
224 seqno: state.seqno,
225 root_hash,
226 file_hash,
227 };
228
229 ShardStateStuff::from_root(&block_id, root, tracker.insert_untracked())
230}
231
232#[async_trait::async_trait]
233pub trait QueueStateHandler: Send + Sync + 'static {
234 async fn import_from_file(
235 &self,
236 top_update: &OutMsgQueueUpdates,
237 file: File,
238 block_id: &BlockId,
239 ) -> Result<()>;
240}
241
242#[async_trait::async_trait]
243impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Arc<T> {
244 async fn import_from_file(
245 &self,
246 top_update: &OutMsgQueueUpdates,
247 file: File,
248 block_id: &BlockId,
249 ) -> Result<()> {
250 T::import_from_file(self, top_update, file, block_id).await
251 }
252}
253
254#[async_trait::async_trait]
255impl<T: QueueStateHandler + ?Sized> QueueStateHandler for Box<T> {
256 async fn import_from_file(
257 &self,
258 top_update: &OutMsgQueueUpdates,
259 file: File,
260 block_id: &BlockId,
261 ) -> Result<()> {
262 T::import_from_file(self, top_update, file, block_id).await
263 }
264}
265
266#[derive(Debug, Clone, Copy)]
268pub struct ValidateQueueState;
269
270#[async_trait::async_trait]
271impl QueueStateHandler for ValidateQueueState {
272 async fn import_from_file(
273 &self,
274 top_update: &OutMsgQueueUpdates,
275 file: File,
276 block_id: &BlockId,
277 ) -> Result<()> {
278 tracing::info!(%block_id, "validating internal queue state from file");
279
280 let top_update = top_update.clone();
281
282 let span = tracing::Span::current();
283 tokio::task::spawn_blocking(move || {
284 let _span = span.enter();
285
286 let mapped = MappedFile::from_existing_file(file)?;
287
288 let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
289
290 while let Some(mut part) = reader.read_next_queue_diff()? {
291 while let Some(cell) = part.read_next_message()? {
292 let msg_hash = cell.repr_hash();
293 let msg = cell.parse::<Message<'_>>()?;
294 let MsgInfo::Int(int_msg_info) = &msg.info else {
295 anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
296 };
297
298 let IntAddr::Std(_dest) = &int_msg_info.dst else {
299 anyhow::bail!("non-std destination address in msg {msg_hash}");
300 };
301
302 let IntAddr::Std(_src) = &int_msg_info.src else {
303 anyhow::bail!("non-std destination address in msg {msg_hash}");
304 };
305 }
306 }
307
308 reader.finish()
309 })
310 .await?
311 }
312}