tycho_core/block_strider/subscriber/
ps_subscriber.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4use anyhow::Result;
5use tycho_block_util::block::BlockStuff;
6use tycho_block_util::state::RefMcStateHandle;
7
8use crate::block_strider::{StateSubscriber, StateSubscriberContext};
9use crate::storage::{BlockHandle, CoreStorage};
10
11/// Persistent state subscriber.
12#[derive(Clone)]
13pub struct PsSubscriber {
14    inner: Arc<Inner>,
15}
16
17impl PsSubscriber {
18    pub fn new(storage: CoreStorage) -> Self {
19        let last_key_block_utime = storage
20            .block_handle_storage()
21            .find_last_key_block()
22            .map_or(0, |handle| handle.gen_utime());
23
24        Self {
25            inner: Arc::new(Inner {
26                last_key_block_utime: AtomicU32::new(last_key_block_utime),
27                storage,
28            }),
29        }
30    }
31}
32
33impl StateSubscriber for PsSubscriber {
34    type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
35
36    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
37        if cx.is_key_block {
38            let block_info = cx.block.load_info().unwrap();
39
40            let prev_utime = self
41                .inner
42                .last_key_block_utime
43                .swap(block_info.gen_utime, Ordering::Relaxed);
44            let is_persistent = BlockStuff::compute_is_persistent(block_info.gen_utime, prev_utime);
45
46            if is_persistent && cx.block.id().seqno != 0 {
47                let block = cx.block.clone();
48                let inner = self.inner.clone();
49                let state_handle = cx.state.ref_mc_state_handle().clone();
50                tokio::spawn(async move {
51                    if let Err(e) = inner.save_impl(block, state_handle).await {
52                        tracing::error!("failed to save persistent states: {e}");
53                    }
54                });
55            }
56        }
57
58        futures_util::future::ready(Ok(()))
59    }
60}
61
62struct Inner {
63    last_key_block_utime: AtomicU32,
64    storage: CoreStorage,
65}
66
67impl Inner {
68    async fn save_impl(
69        &self,
70        mc_block: BlockStuff,
71        mc_state_handle: RefMcStateHandle,
72    ) -> Result<()> {
73        let block_handles = self.storage.block_handle_storage();
74
75        let Some(mc_block_handle) = block_handles.load_handle(mc_block.id()) else {
76            anyhow::bail!("masterchain block handle not found: {}", mc_block.id());
77        };
78        block_handles.set_block_persistent(&mc_block_handle);
79
80        let (state_result, queue_result) = tokio::join!(
81            self.save_persistent_shard_states(
82                mc_block_handle.clone(),
83                mc_block.clone(),
84                mc_state_handle
85            ),
86            self.save_persistent_queue_states(mc_block_handle.clone(), mc_block),
87        );
88        state_result?;
89        queue_result?;
90
91        self.storage
92            .persistent_state_storage()
93            .rotate_persistent_states(&mc_block_handle)
94            .await?;
95
96        metrics::counter!("tycho_core_ps_subscriber_saved_persistent_states_count").increment(1);
97
98        Ok(())
99    }
100
101    async fn save_persistent_shard_states(
102        &self,
103        mc_block_handle: BlockHandle,
104        mc_block: BlockStuff,
105        mc_state_handle: RefMcStateHandle,
106    ) -> Result<()> {
107        let block_handles = self.storage.block_handle_storage();
108        let persistent_states = self.storage.persistent_state_storage();
109
110        let mc_seqno = mc_block_handle.id().seqno;
111        for entry in mc_block.load_custom()?.shards.latest_blocks() {
112            let block_id = entry?;
113            let Some(block_handle) = block_handles.load_handle(&block_id) else {
114                anyhow::bail!("top shard block handle not found: {block_id}");
115            };
116
117            // NOTE: We could have also called the `set_block_persistent` here, but we
118            //       only do this in the first part of the `save_persistent_queue_states`.
119
120            persistent_states
121                .store_shard_state(mc_seqno, &block_handle, mc_state_handle.clone())
122                .await?;
123        }
124
125        // NOTE: We intentionally store the masterchain state last to ensure that
126        //       the handle will live long enough. And this way we don't mislead
127        //       other nodes with the incomplete set of persistent states.
128        persistent_states
129            .store_shard_state(mc_seqno, &mc_block_handle, mc_state_handle)
130            .await
131    }
132
133    async fn save_persistent_queue_states(
134        &self,
135        mc_block_handle: BlockHandle,
136        mc_block: BlockStuff,
137    ) -> Result<()> {
138        if mc_block_handle.id().seqno == 0 {
139            // No queue states for zerostate.
140            return Ok(());
141        }
142
143        let blocks = self.storage.block_storage();
144        let block_handles = self.storage.block_handle_storage();
145        let persistent_states = self.storage.persistent_state_storage();
146
147        let mut shard_block_handles = Vec::new();
148
149        for entry in mc_block.load_custom()?.shards.latest_blocks() {
150            let block_id = entry?;
151            if block_id.seqno == 0 {
152                // No queue states for zerostate.
153                continue;
154            }
155
156            let Some(block_handle) = block_handles.load_handle(&block_id) else {
157                anyhow::bail!("top shard block handle not found: {block_id}");
158            };
159
160            // NOTE: We set the flag only here because this part will be executed
161            //       first, without waiting for other states or queues to be saved.
162            block_handles.set_block_persistent(&block_handle);
163
164            shard_block_handles.push(block_handle);
165        }
166
167        // Store queue state for each shard
168        let mc_seqno = mc_block_handle.id().seqno;
169        for block_handle in shard_block_handles {
170            let block = blocks.load_block_data(&block_handle).await?;
171            persistent_states
172                .store_queue_state(mc_seqno, &block_handle, block)
173                .await?;
174        }
175
176        persistent_states
177            .store_queue_state(mc_seqno, &mc_block_handle, mc_block)
178            .await
179    }
180}