tycho_core/block_strider/
state_applier.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4use anyhow::{Context, Result};
5use futures_util::future::BoxFuture;
6use tokio::task::JoinHandle;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::dict::split_aug_dict_raw;
9use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
10use tycho_types::cell::{Cell, HashBytes};
11use tycho_util::metrics::HistogramGuard;
12use tycho_util::sync::rayon_run;
13
14use crate::block_strider::{
15 BlockSaver, BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
16};
17use crate::storage::{BlockHandle, CoreStorage, StoreStateHint};
18
19#[repr(transparent)]
20pub struct ShardStateApplier<S> {
21 inner: Arc<Inner<S>>,
22}
23
24impl<S> ShardStateApplier<S>
25where
26 S: StateSubscriber,
27{
28 pub fn new(storage: CoreStorage, state_subscriber: S) -> Self {
29 Self::new_ext(storage, state_subscriber, true)
30 }
31
32 pub fn without_persistent_states(storage: CoreStorage, state_subscriber: S) -> Self {
33 Self::new_ext(storage, state_subscriber, false)
34 }
35
36 fn new_ext(storage: CoreStorage, state_subscriber: S, store_persistent_states: bool) -> Self {
37 let last_key_block_utime = if store_persistent_states {
38 Self::find_last_key_block_utime(&storage)
39 } else {
40 0
41 };
42
43 Self {
44 inner: Arc::new(Inner {
45 block_saver: BlockSaver::new(storage.clone()),
46 storage,
47 state_subscriber,
48
49 store_persistent_states,
50 last_key_block_utime: AtomicU32::new(last_key_block_utime),
51 prev_state_task: Default::default(),
52 }),
53 }
54 }
55
56 fn find_last_key_block_utime(storage: &CoreStorage) -> u32 {
57 storage
58 .block_handle_storage()
59 .find_last_key_block()
60 .map_or(0, |handle| handle.gen_utime())
61 }
62
63 async fn prepare_block_impl(
64 &self,
65 cx: &BlockSubscriberContext,
66 ) -> Result<StateApplierPrepared> {
67 let _histogram = HistogramGuard::begin("tycho_core_state_applier_prepare_block_time");
68
69 let handle = self.inner.block_saver.save_block(cx).await?;
70
71 tracing::info!(
72 mc_block_id = %cx.mc_block_id.as_short_id(),
73 id = %cx.block.id(),
74 "preparing block",
75 );
76
77 let state_storage = self.inner.storage.shard_state_storage();
78
79 let state = if handle.has_state() {
81 state_storage
83 .load_state(handle.ref_by_mc_seqno(), handle.id())
84 .await
85 .context("failed to load applied shard state")?
86 } else {
87 let (prev_id, prev_id_alt) = cx
89 .block
90 .construct_prev_id()
91 .context("failed to construct prev id")?;
92
93 let (prev_root_cell, handles, old_split_at) = {
94 let prev_state = state_storage
96 .load_state(0, &prev_id)
97 .await
98 .context("failed to load prev shard state")?;
99
100 let old_split_at = split_aug_dict_raw(prev_state.state().load_accounts()?, 5)?
101 .into_keys()
102 .collect::<ahash::HashSet<_>>();
103
104 match &prev_id_alt {
105 Some(prev_id) => {
106 let prev_state_alt = state_storage
108 .load_state(0, prev_id)
109 .await
110 .context("failed to load alt prev shard state")?;
111
112 let cell = ShardStateStuff::construct_split_root(
113 prev_state.root_cell().clone(),
114 prev_state_alt.root_cell().clone(),
115 )?;
116 let left_handle = prev_state.ref_mc_state_handle().clone();
117 let right_handle = prev_state_alt.ref_mc_state_handle().clone();
118 (
119 cell,
120 RefMcStateHandles::Split(left_handle, right_handle),
121 old_split_at,
122 )
123 }
124 None => {
125 let cell = prev_state.root_cell().clone();
126 let handle = prev_state.ref_mc_state_handle().clone();
127 (cell, RefMcStateHandles::Single(handle), old_split_at)
128 }
129 }
130 };
131
132 self.compute_and_store_state_update(
134 &cx.block,
135 &handle,
136 prev_root_cell,
137 old_split_at,
138 handles.min_safe_handle().clone(),
139 )
140 .await?
141 };
142
143 Ok(StateApplierPrepared { state })
144 }
145
146 async fn handle_block_impl(
147 &self,
148 cx: &BlockSubscriberContext,
149 prepared: StateApplierPrepared,
150 ) -> Result<()> {
151 let _histogram = HistogramGuard::begin("tycho_core_state_applier_handle_block_time");
152
153 tracing::info!(
154 mc_block_id = %cx.mc_block_id.as_short_id(),
155 id = %cx.block.id(),
156 "handling block",
157 );
158
159 let _histogram = HistogramGuard::begin("tycho_core_subscriber_handle_state_time");
161
162 let cx = StateSubscriberContext {
163 mc_block_id: cx.mc_block_id,
164 mc_is_key_block: cx.mc_is_key_block,
165 is_key_block: cx.is_key_block,
166 block: cx.block.clone(),
167 archive_data: cx.archive_data.clone(),
168 state: prepared.state,
169 delayed: cx.delayed.clone(),
170 };
171
172 let subscriber_fut = self.inner.state_subscriber.handle_state(&cx);
173
174 if self.inner.store_persistent_states {
175 let applier_fut = self.try_save_persistent_states(&cx);
176 match futures_util::future::join(applier_fut, subscriber_fut).await {
177 (Err(e), _) | (_, Err(e)) => Err(e),
178 _ => Ok(()),
179 }
180 } else {
181 subscriber_fut.await
182 }
183 }
184
185 async fn compute_and_store_state_update(
186 &self,
187 block: &BlockStuff,
188 handle: &BlockHandle,
189 prev_root: Cell,
190 split_at: ahash::HashSet<HashBytes>,
191 ref_mc_state_handle: RefMcStateHandle,
192 ) -> Result<ShardStateStuff> {
193 let labels = [("workchain", block.id().shard.workchain().to_string())];
194 let _histogram =
195 HistogramGuard::begin_with_labels("tycho_core_apply_block_time_high", &labels);
196
197 let update = block
198 .as_ref()
199 .load_state_update()
200 .context("Failed to load state update")?;
201
202 let apply_in_mem = HistogramGuard::begin("tycho_core_apply_block_in_mem_time_high");
203
204 let new_state = rayon_run(move || update.par_apply(&prev_root, &split_at))
205 .await
206 .context("Failed to apply state update")?;
207
208 apply_in_mem.finish();
209
210 let state_storage = self.inner.storage.shard_state_storage();
211
212 let new_state = ShardStateStuff::from_root(block.id(), new_state, ref_mc_state_handle)
213 .context("Failed to create new state")?;
214
215 state_storage
216 .store_state(handle, &new_state, StoreStateHint {
217 block_data_size: Some(block.data_size()),
218 })
219 .await
220 .context("Failed to store new state")?;
221
222 Ok(new_state)
223 }
224
225 async fn try_save_persistent_states(&self, cx: &StateSubscriberContext) -> Result<()> {
226 let this = self.inner.as_ref();
227
228 let mut prev_task = this.prev_state_task.lock().await;
231 if let Some(task) = &mut *prev_task
232 && task.is_finished()
233 {
234 task.join().await?;
235 }
236 drop(prev_task);
237
238 if cx.is_key_block {
239 let block_info = cx.block.load_info()?;
240
241 let prev_utime = this
242 .last_key_block_utime
243 .swap(block_info.gen_utime, Ordering::Relaxed);
244
245 if BlockStuff::compute_is_persistent(block_info.gen_utime, prev_utime) {
246 let mut prev_task = this.prev_state_task.lock().await;
247 if let Some(task) = &mut *prev_task {
248 task.join().await?;
249 }
250
251 let block = cx.block.clone();
252 let inner = self.inner.clone();
253 let state_handle = cx.state.ref_mc_state_handle().clone();
254
255 *prev_task = Some(StorePersistentStateTask {
256 mc_seqno: cx.mc_block_id.seqno,
257 handle: Some(tokio::spawn(async move {
258 inner.save_persistent_states(block, state_handle).await
259 })),
260 });
261 }
262 }
263
264 Ok(())
265 }
266}
267
268impl<S> Clone for ShardStateApplier<S> {
269 #[inline]
270 fn clone(&self) -> Self {
271 Self {
272 inner: self.inner.clone(),
273 }
274 }
275}
276
277impl<S> BlockSubscriber for ShardStateApplier<S>
278where
279 S: StateSubscriber,
280{
281 type Prepared = StateApplierPrepared;
282
283 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
284 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
285
286 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
287 Box::pin(self.prepare_block_impl(cx))
288 }
289
290 fn handle_block<'a>(
291 &'a self,
292 cx: &'a BlockSubscriberContext,
293 prepared: Self::Prepared,
294 ) -> Self::HandleBlockFut<'a> {
295 Box::pin(self.handle_block_impl(cx, prepared))
296 }
297}
298
299pub struct StateApplierPrepared {
300 state: ShardStateStuff,
301}
302
303enum RefMcStateHandles {
304 Split(RefMcStateHandle, RefMcStateHandle),
305 Single(RefMcStateHandle),
306}
307
308impl RefMcStateHandles {
309 fn min_safe_handle(&self) -> &RefMcStateHandle {
310 match self {
311 Self::Split(left, right) => left.min_safe(right),
312 Self::Single(handle) => handle,
313 }
314 }
315}
316
317struct Inner<S> {
318 storage: CoreStorage,
319 state_subscriber: S,
320 block_saver: BlockSaver,
321
322 store_persistent_states: bool,
323 last_key_block_utime: AtomicU32,
324 prev_state_task: tokio::sync::Mutex<Option<StorePersistentStateTask>>,
325}
326
327impl<S> Inner<S> {
328 async fn save_persistent_states(
329 &self,
330 mc_block: BlockStuff,
331 mc_state_handle: RefMcStateHandle,
332 ) -> Result<()> {
333 let block_handles = self.storage.block_handle_storage();
334
335 let Some(mc_block_handle) = block_handles.load_handle(mc_block.id()) else {
336 anyhow::bail!("masterchain block handle not found: {}", mc_block.id());
337 };
338 block_handles.set_block_persistent(&mc_block_handle);
339
340 let (state_result, queue_result) = tokio::join!(
341 self.save_persistent_shard_states(
342 mc_block_handle.clone(),
343 mc_block.clone(),
344 mc_state_handle
345 ),
346 self.save_persistent_queue_states(mc_block_handle.clone(), mc_block),
347 );
348 state_result?;
349 queue_result?;
350
351 self.storage
352 .persistent_state_storage()
353 .rotate_persistent_states(&mc_block_handle)
354 .await?;
355
356 metrics::counter!("tycho_core_ps_subscriber_saved_persistent_states_count").increment(1);
357 tracing::debug!("saved persistent state for {}", mc_block_handle.id());
358
359 Ok(())
360 }
361
362 async fn save_persistent_shard_states(
363 &self,
364 mc_block_handle: BlockHandle,
365 mc_block: BlockStuff,
366 mc_state_handle: RefMcStateHandle,
367 ) -> Result<()> {
368 let block_handles = self.storage.block_handle_storage();
369 let persistent_states = self.storage.persistent_state_storage();
370
371 let mc_seqno = mc_block_handle.id().seqno;
372 for entry in mc_block.load_custom()?.shards.latest_blocks() {
373 let block_id = entry?;
374 let Some(block_handle) = block_handles.load_handle(&block_id) else {
375 anyhow::bail!("top shard block handle not found: {block_id}");
376 };
377
378 persistent_states
382 .store_shard_state(mc_seqno, &block_handle, mc_state_handle.clone())
383 .await?;
384 }
385
386 persistent_states
390 .store_shard_state(mc_seqno, &mc_block_handle, mc_state_handle)
391 .await?;
392
393 Ok(())
394 }
395
396 async fn save_persistent_queue_states(
397 &self,
398 mc_block_handle: BlockHandle,
399 mc_block: BlockStuff,
400 ) -> Result<()> {
401 let blocks = self.storage.block_storage();
402 let block_handles = self.storage.block_handle_storage();
403 let persistent_states = self.storage.persistent_state_storage();
404
405 let mut shard_block_handles = Vec::new();
406
407 for entry in mc_block.load_custom()?.shards.latest_blocks() {
408 let block_id = entry?;
409 let Some(block_handle) = block_handles.load_handle(&block_id) else {
410 anyhow::bail!("top shard block handle not found: {block_id}");
411 };
412
413 block_handles.set_block_persistent(&block_handle);
416
417 shard_block_handles.push(block_handle);
418 }
419
420 let mc_seqno = mc_block_handle.id().seqno;
422 for block_handle in shard_block_handles {
423 let block = blocks.load_block_data(&block_handle).await?;
424 persistent_states
425 .store_queue_state(mc_seqno, &block_handle, block)
426 .await?;
427 }
428
429 persistent_states
430 .store_queue_state(mc_seqno, &mc_block_handle, mc_block)
431 .await?;
432
433 Ok(())
434 }
435}
436
437struct StorePersistentStateTask {
438 mc_seqno: u32,
439 handle: Option<JoinHandle<Result<()>>>,
440}
441
442impl StorePersistentStateTask {
443 async fn join(&mut self) -> Result<()> {
444 if let Some(handle) = &mut self.handle {
446 let result = handle
447 .await
448 .map_err(|e| {
449 if e.is_panic() {
450 std::panic::resume_unwind(e.into_panic());
451 }
452 anyhow::Error::from(e)
453 })
454 .and_then(std::convert::identity);
455
456 self.handle = None;
457
458 if let Err(e) = &result {
459 tracing::error!(
460 mc_seqno = self.mc_seqno,
461 "failed to save persistent state: {e:?}"
462 );
463 }
464
465 return result;
466 }
467
468 Ok(())
469 }
470
471 fn is_finished(&self) -> bool {
472 if let Some(handle) = &self.handle {
473 return handle.is_finished();
474 }
475
476 false
477 }
478}