Skip to main content

tycho_core/block_strider/
state_applier.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4use anyhow::{Context, Result};
5use futures_util::future::BoxFuture;
6use tokio::task::JoinHandle;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::dict::split_aug_dict_raw;
9use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
10use tycho_types::cell::{Cell, HashBytes};
11use tycho_util::metrics::HistogramGuard;
12use tycho_util::sync::rayon_run;
13
14use crate::block_strider::{
15    BlockSaver, BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
16};
17use crate::storage::{BlockHandle, CoreStorage, StoreStateHint};
18
19#[repr(transparent)]
20pub struct ShardStateApplier<S> {
21    inner: Arc<Inner<S>>,
22}
23
24impl<S> ShardStateApplier<S>
25where
26    S: StateSubscriber,
27{
28    pub fn new(storage: CoreStorage, state_subscriber: S) -> Self {
29        Self::new_ext(storage, state_subscriber, true)
30    }
31
32    pub fn without_persistent_states(storage: CoreStorage, state_subscriber: S) -> Self {
33        Self::new_ext(storage, state_subscriber, false)
34    }
35
36    fn new_ext(storage: CoreStorage, state_subscriber: S, store_persistent_states: bool) -> Self {
37        let last_key_block_utime = if store_persistent_states {
38            Self::find_last_key_block_utime(&storage)
39        } else {
40            0
41        };
42
43        Self {
44            inner: Arc::new(Inner {
45                block_saver: BlockSaver::new(storage.clone()),
46                storage,
47                state_subscriber,
48
49                store_persistent_states,
50                last_key_block_utime: AtomicU32::new(last_key_block_utime),
51                prev_state_task: Default::default(),
52            }),
53        }
54    }
55
56    fn find_last_key_block_utime(storage: &CoreStorage) -> u32 {
57        storage
58            .block_handle_storage()
59            .find_last_key_block()
60            .map_or(0, |handle| handle.gen_utime())
61    }
62
63    async fn prepare_block_impl(
64        &self,
65        cx: &BlockSubscriberContext,
66    ) -> Result<StateApplierPrepared> {
67        let _histogram = HistogramGuard::begin("tycho_core_state_applier_prepare_block_time");
68
69        let handle = self.inner.block_saver.save_block(cx).await?;
70
71        tracing::info!(
72            mc_block_id = %cx.mc_block_id.as_short_id(),
73            id = %cx.block.id(),
74            "preparing block",
75        );
76
77        let state_storage = self.inner.storage.shard_state_storage();
78
79        // Load/Apply state
80        let state = if handle.has_state() {
81            // Fast path when state is already applied
82            state_storage
83                .load_state(handle.ref_by_mc_seqno(), handle.id())
84                .await
85                .context("failed to load applied shard state")?
86        } else {
87            // Load previous states
88            let (prev_id, prev_id_alt) = cx
89                .block
90                .construct_prev_id()
91                .context("failed to construct prev id")?;
92
93            let (prev_root_cell, handles, old_split_at) = {
94                // NOTE: Use zero epoch here since we don't need to reuse these states.
95                let prev_state = state_storage
96                    .load_state(0, &prev_id)
97                    .await
98                    .context("failed to load prev shard state")?;
99
100                let old_split_at = split_aug_dict_raw(prev_state.state().load_accounts()?, 5)?
101                    .into_keys()
102                    .collect::<ahash::HashSet<_>>();
103
104                match &prev_id_alt {
105                    Some(prev_id) => {
106                        // NOTE: Use zero epoch here since we don't need to reuse these states.
107                        let prev_state_alt = state_storage
108                            .load_state(0, prev_id)
109                            .await
110                            .context("failed to load alt prev shard state")?;
111
112                        let cell = ShardStateStuff::construct_split_root(
113                            prev_state.root_cell().clone(),
114                            prev_state_alt.root_cell().clone(),
115                        )?;
116                        let left_handle = prev_state.ref_mc_state_handle().clone();
117                        let right_handle = prev_state_alt.ref_mc_state_handle().clone();
118                        (
119                            cell,
120                            RefMcStateHandles::Split(left_handle, right_handle),
121                            old_split_at,
122                        )
123                    }
124                    None => {
125                        let cell = prev_state.root_cell().clone();
126                        let handle = prev_state.ref_mc_state_handle().clone();
127                        (cell, RefMcStateHandles::Single(handle), old_split_at)
128                    }
129                }
130            };
131
132            // Apply state
133            self.compute_and_store_state_update(
134                &cx.block,
135                &handle,
136                prev_root_cell,
137                old_split_at,
138                handles.min_safe_handle().clone(),
139            )
140            .await?
141        };
142
143        Ok(StateApplierPrepared { state })
144    }
145
146    async fn handle_block_impl(
147        &self,
148        cx: &BlockSubscriberContext,
149        prepared: StateApplierPrepared,
150    ) -> Result<()> {
151        let _histogram = HistogramGuard::begin("tycho_core_state_applier_handle_block_time");
152
153        tracing::info!(
154            mc_block_id = %cx.mc_block_id.as_short_id(),
155            id = %cx.block.id(),
156            "handling block",
157        );
158
159        // Process state
160        let _histogram = HistogramGuard::begin("tycho_core_subscriber_handle_state_time");
161
162        let cx = StateSubscriberContext {
163            mc_block_id: cx.mc_block_id,
164            mc_is_key_block: cx.mc_is_key_block,
165            is_key_block: cx.is_key_block,
166            block: cx.block.clone(),
167            archive_data: cx.archive_data.clone(),
168            state: prepared.state,
169            delayed: cx.delayed.clone(),
170        };
171
172        let subscriber_fut = self.inner.state_subscriber.handle_state(&cx);
173
174        if self.inner.store_persistent_states {
175            let applier_fut = self.try_save_persistent_states(&cx);
176            match futures_util::future::join(applier_fut, subscriber_fut).await {
177                (Err(e), _) | (_, Err(e)) => Err(e),
178                _ => Ok(()),
179            }
180        } else {
181            subscriber_fut.await
182        }
183    }
184
185    async fn compute_and_store_state_update(
186        &self,
187        block: &BlockStuff,
188        handle: &BlockHandle,
189        prev_root: Cell,
190        split_at: ahash::HashSet<HashBytes>,
191        ref_mc_state_handle: RefMcStateHandle,
192    ) -> Result<ShardStateStuff> {
193        let labels = [("workchain", block.id().shard.workchain().to_string())];
194        let _histogram =
195            HistogramGuard::begin_with_labels("tycho_core_apply_block_time_high", &labels);
196
197        let update = block
198            .as_ref()
199            .load_state_update()
200            .context("Failed to load state update")?;
201
202        let apply_in_mem = HistogramGuard::begin("tycho_core_apply_block_in_mem_time_high");
203
204        let new_state = rayon_run(move || update.par_apply(&prev_root, &split_at))
205            .await
206            .context("Failed to apply state update")?;
207
208        apply_in_mem.finish();
209
210        let state_storage = self.inner.storage.shard_state_storage();
211
212        let new_state = ShardStateStuff::from_root(block.id(), new_state, ref_mc_state_handle)
213            .context("Failed to create new state")?;
214
215        state_storage
216            .store_state(handle, &new_state, StoreStateHint {
217                block_data_size: Some(block.data_size()),
218            })
219            .await
220            .context("Failed to store new state")?;
221
222        Ok(new_state)
223    }
224
225    async fn try_save_persistent_states(&self, cx: &StateSubscriberContext) -> Result<()> {
226        let this = self.inner.as_ref();
227
228        // Check if the previous persistent state save task has finished.
229        // This allows us to detect errors early without waiting for the next key block
230        let mut prev_task = this.prev_state_task.lock().await;
231        if let Some(task) = &mut *prev_task
232            && task.is_finished()
233        {
234            task.join().await?;
235        }
236        drop(prev_task);
237
238        if cx.is_key_block {
239            let block_info = cx.block.load_info()?;
240
241            let prev_utime = this
242                .last_key_block_utime
243                .swap(block_info.gen_utime, Ordering::Relaxed);
244
245            if BlockStuff::compute_is_persistent(block_info.gen_utime, prev_utime) {
246                let mut prev_task = this.prev_state_task.lock().await;
247                if let Some(task) = &mut *prev_task {
248                    task.join().await?;
249                }
250
251                let block = cx.block.clone();
252                let inner = self.inner.clone();
253                let state_handle = cx.state.ref_mc_state_handle().clone();
254
255                *prev_task = Some(StorePersistentStateTask {
256                    mc_seqno: cx.mc_block_id.seqno,
257                    handle: Some(tokio::spawn(async move {
258                        inner.save_persistent_states(block, state_handle).await
259                    })),
260                });
261            }
262        }
263
264        Ok(())
265    }
266}
267
268impl<S> Clone for ShardStateApplier<S> {
269    #[inline]
270    fn clone(&self) -> Self {
271        Self {
272            inner: self.inner.clone(),
273        }
274    }
275}
276
277impl<S> BlockSubscriber for ShardStateApplier<S>
278where
279    S: StateSubscriber,
280{
281    type Prepared = StateApplierPrepared;
282
283    type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
284    type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
285
286    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
287        Box::pin(self.prepare_block_impl(cx))
288    }
289
290    fn handle_block<'a>(
291        &'a self,
292        cx: &'a BlockSubscriberContext,
293        prepared: Self::Prepared,
294    ) -> Self::HandleBlockFut<'a> {
295        Box::pin(self.handle_block_impl(cx, prepared))
296    }
297}
298
299pub struct StateApplierPrepared {
300    state: ShardStateStuff,
301}
302
303enum RefMcStateHandles {
304    Split(RefMcStateHandle, RefMcStateHandle),
305    Single(RefMcStateHandle),
306}
307
308impl RefMcStateHandles {
309    fn min_safe_handle(&self) -> &RefMcStateHandle {
310        match self {
311            Self::Split(left, right) => left.min_safe(right),
312            Self::Single(handle) => handle,
313        }
314    }
315}
316
317struct Inner<S> {
318    storage: CoreStorage,
319    state_subscriber: S,
320    block_saver: BlockSaver,
321
322    store_persistent_states: bool,
323    last_key_block_utime: AtomicU32,
324    prev_state_task: tokio::sync::Mutex<Option<StorePersistentStateTask>>,
325}
326
327impl<S> Inner<S> {
328    async fn save_persistent_states(
329        &self,
330        mc_block: BlockStuff,
331        mc_state_handle: RefMcStateHandle,
332    ) -> Result<()> {
333        let block_handles = self.storage.block_handle_storage();
334
335        let Some(mc_block_handle) = block_handles.load_handle(mc_block.id()) else {
336            anyhow::bail!("masterchain block handle not found: {}", mc_block.id());
337        };
338        block_handles.set_block_persistent(&mc_block_handle);
339
340        let (state_result, queue_result) = tokio::join!(
341            self.save_persistent_shard_states(
342                mc_block_handle.clone(),
343                mc_block.clone(),
344                mc_state_handle
345            ),
346            self.save_persistent_queue_states(mc_block_handle.clone(), mc_block),
347        );
348        state_result?;
349        queue_result?;
350
351        self.storage
352            .persistent_state_storage()
353            .rotate_persistent_states(&mc_block_handle)
354            .await?;
355
356        metrics::counter!("tycho_core_ps_subscriber_saved_persistent_states_count").increment(1);
357        tracing::debug!("saved persistent state for {}", mc_block_handle.id());
358
359        Ok(())
360    }
361
362    async fn save_persistent_shard_states(
363        &self,
364        mc_block_handle: BlockHandle,
365        mc_block: BlockStuff,
366        mc_state_handle: RefMcStateHandle,
367    ) -> Result<()> {
368        let block_handles = self.storage.block_handle_storage();
369        let persistent_states = self.storage.persistent_state_storage();
370
371        let mc_seqno = mc_block_handle.id().seqno;
372        for entry in mc_block.load_custom()?.shards.latest_blocks() {
373            let block_id = entry?;
374            let Some(block_handle) = block_handles.load_handle(&block_id) else {
375                anyhow::bail!("top shard block handle not found: {block_id}");
376            };
377
378            // NOTE: We could have also called the `set_block_persistent` here, but we
379            //       only do this in the first part of the `save_persistent_queue_states`.
380
381            persistent_states
382                .store_shard_state(mc_seqno, &block_handle, mc_state_handle.clone())
383                .await?;
384        }
385
386        // NOTE: We intentionally store the masterchain state last to ensure that
387        //       the handle will live long enough. And this way we don't mislead
388        //       other nodes with the incomplete set of persistent states.
389        persistent_states
390            .store_shard_state(mc_seqno, &mc_block_handle, mc_state_handle)
391            .await?;
392
393        Ok(())
394    }
395
396    async fn save_persistent_queue_states(
397        &self,
398        mc_block_handle: BlockHandle,
399        mc_block: BlockStuff,
400    ) -> Result<()> {
401        let blocks = self.storage.block_storage();
402        let block_handles = self.storage.block_handle_storage();
403        let persistent_states = self.storage.persistent_state_storage();
404
405        let mut shard_block_handles = Vec::new();
406
407        for entry in mc_block.load_custom()?.shards.latest_blocks() {
408            let block_id = entry?;
409            let Some(block_handle) = block_handles.load_handle(&block_id) else {
410                anyhow::bail!("top shard block handle not found: {block_id}");
411            };
412
413            // NOTE: We set the flag only here because this part will be executed
414            //       first, without waiting for other states or queues to be saved.
415            block_handles.set_block_persistent(&block_handle);
416
417            shard_block_handles.push(block_handle);
418        }
419
420        // Store queue state for each shard
421        let mc_seqno = mc_block_handle.id().seqno;
422        for block_handle in shard_block_handles {
423            let block = blocks.load_block_data(&block_handle).await?;
424            persistent_states
425                .store_queue_state(mc_seqno, &block_handle, block)
426                .await?;
427        }
428
429        persistent_states
430            .store_queue_state(mc_seqno, &mc_block_handle, mc_block)
431            .await?;
432
433        Ok(())
434    }
435}
436
437struct StorePersistentStateTask {
438    mc_seqno: u32,
439    handle: Option<JoinHandle<Result<()>>>,
440}
441
442impl StorePersistentStateTask {
443    async fn join(&mut self) -> Result<()> {
444        // NOTE: Await on reference to make sure that the task is cancel safe
445        if let Some(handle) = &mut self.handle {
446            let result = handle
447                .await
448                .map_err(|e| {
449                    if e.is_panic() {
450                        std::panic::resume_unwind(e.into_panic());
451                    }
452                    anyhow::Error::from(e)
453                })
454                .and_then(std::convert::identity);
455
456            self.handle = None;
457
458            if let Err(e) = &result {
459                tracing::error!(
460                    mc_seqno = self.mc_seqno,
461                    "failed to save persistent state: {e:?}"
462                );
463            }
464
465            return result;
466        }
467
468        Ok(())
469    }
470
471    fn is_finished(&self) -> bool {
472        if let Some(handle) = &self.handle {
473            return handle.is_finished();
474        }
475
476        false
477    }
478}