tycho_core/block_strider/subscriber/
ps_subscriber.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4use anyhow::Result;
5use futures_util::future::BoxFuture;
6use tokio::task::JoinHandle;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::RefMcStateHandle;
9
10use crate::block_strider::{StateSubscriber, StateSubscriberContext};
11use crate::storage::{BlockHandle, CoreStorage};
12
13/// Persistent state subscriber.
14#[derive(Clone)]
15pub struct PsSubscriber {
16    inner: Arc<Inner>,
17}
18
19impl PsSubscriber {
20    pub fn new(storage: CoreStorage) -> Self {
21        let last_key_block_utime = Self::find_last_key_block_utime(&storage);
22        Self {
23            inner: Arc::new(Inner {
24                last_key_block_utime: AtomicU32::new(last_key_block_utime),
25                storage,
26                prev_state_task: Default::default(),
27            }),
28        }
29    }
30
31    fn find_last_key_block_utime(storage: &CoreStorage) -> u32 {
32        storage
33            .block_handle_storage()
34            .find_last_key_block()
35            .map_or(0, |handle| handle.gen_utime())
36    }
37}
38
39impl StateSubscriber for PsSubscriber {
40    type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
41
42    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
43        Box::pin(self.inner.handle_state_impl(cx))
44    }
45}
46
47struct Inner {
48    last_key_block_utime: AtomicU32,
49    storage: CoreStorage,
50    prev_state_task: tokio::sync::Mutex<Option<StorePersistentStateTask>>,
51}
52
53impl Inner {
54    async fn handle_state_impl(self: &Arc<Self>, cx: &StateSubscriberContext) -> Result<()> {
55        // Check if the previous persistent state save task has finished.
56        // This allows us to detect errors early without waiting for the next key block
57        let mut prev_task = self.prev_state_task.lock().await;
58        if let Some(task) = &mut *prev_task
59            && task.is_finished()
60        {
61            task.join().await?;
62        }
63        drop(prev_task);
64
65        if cx.is_key_block {
66            let block_info = cx.block.load_info()?;
67
68            let prev_utime = self
69                .last_key_block_utime
70                .swap(block_info.gen_utime, Ordering::Relaxed);
71            let is_persistent = BlockStuff::compute_is_persistent(block_info.gen_utime, prev_utime);
72
73            if is_persistent && cx.block.id().seqno != 0 {
74                let mut prev_task = self.prev_state_task.lock().await;
75                if let Some(task) = &mut *prev_task {
76                    task.join().await?;
77                }
78
79                let block = cx.block.clone();
80                let inner = self.clone();
81                let state_handle = cx.state.ref_mc_state_handle().clone();
82
83                *prev_task = Some(StorePersistentStateTask {
84                    mc_seqno: cx.mc_block_id.seqno,
85                    handle: Some(tokio::spawn(async move {
86                        inner.save_impl(block, state_handle).await
87                    })),
88                });
89            }
90        }
91
92        Ok(())
93    }
94
95    async fn save_impl(
96        &self,
97        mc_block: BlockStuff,
98        mc_state_handle: RefMcStateHandle,
99    ) -> Result<()> {
100        let block_handles = self.storage.block_handle_storage();
101
102        let Some(mc_block_handle) = block_handles.load_handle(mc_block.id()) else {
103            anyhow::bail!("masterchain block handle not found: {}", mc_block.id());
104        };
105        block_handles.set_block_persistent(&mc_block_handle);
106
107        let (state_result, queue_result) = tokio::join!(
108            self.save_persistent_shard_states(
109                mc_block_handle.clone(),
110                mc_block.clone(),
111                mc_state_handle
112            ),
113            self.save_persistent_queue_states(mc_block_handle.clone(), mc_block),
114        );
115        state_result?;
116        queue_result?;
117
118        self.storage
119            .persistent_state_storage()
120            .rotate_persistent_states(&mc_block_handle)
121            .await?;
122
123        metrics::counter!("tycho_core_ps_subscriber_saved_persistent_states_count").increment(1);
124
125        Ok(())
126    }
127
128    async fn save_persistent_shard_states(
129        &self,
130        mc_block_handle: BlockHandle,
131        mc_block: BlockStuff,
132        mc_state_handle: RefMcStateHandle,
133    ) -> Result<()> {
134        let block_handles = self.storage.block_handle_storage();
135        let persistent_states = self.storage.persistent_state_storage();
136
137        let mc_seqno = mc_block_handle.id().seqno;
138        for entry in mc_block.load_custom()?.shards.latest_blocks() {
139            let block_id = entry?;
140            let Some(block_handle) = block_handles.load_handle(&block_id) else {
141                anyhow::bail!("top shard block handle not found: {block_id}");
142            };
143
144            // NOTE: We could have also called the `set_block_persistent` here, but we
145            //       only do this in the first part of the `save_persistent_queue_states`.
146
147            persistent_states
148                .store_shard_state(mc_seqno, &block_handle, mc_state_handle.clone())
149                .await?;
150        }
151
152        // NOTE: We intentionally store the masterchain state last to ensure that
153        //       the handle will live long enough. And this way we don't mislead
154        //       other nodes with the incomplete set of persistent states.
155        persistent_states
156            .store_shard_state(mc_seqno, &mc_block_handle, mc_state_handle)
157            .await?;
158
159        Ok(())
160    }
161
162    async fn save_persistent_queue_states(
163        &self,
164        mc_block_handle: BlockHandle,
165        mc_block: BlockStuff,
166    ) -> Result<()> {
167        if mc_block_handle.id().seqno == 0 {
168            // No queue states for zerostate.
169            return Ok(());
170        }
171
172        let blocks = self.storage.block_storage();
173        let block_handles = self.storage.block_handle_storage();
174        let persistent_states = self.storage.persistent_state_storage();
175
176        let mut shard_block_handles = Vec::new();
177
178        for entry in mc_block.load_custom()?.shards.latest_blocks() {
179            let block_id = entry?;
180            if block_id.seqno == 0 {
181                // No queue states for zerostate.
182                continue;
183            }
184
185            let Some(block_handle) = block_handles.load_handle(&block_id) else {
186                anyhow::bail!("top shard block handle not found: {block_id}");
187            };
188
189            // NOTE: We set the flag only here because this part will be executed
190            //       first, without waiting for other states or queues to be saved.
191            block_handles.set_block_persistent(&block_handle);
192
193            shard_block_handles.push(block_handle);
194        }
195
196        // Store queue state for each shard
197        let mc_seqno = mc_block_handle.id().seqno;
198        for block_handle in shard_block_handles {
199            let block = blocks.load_block_data(&block_handle).await?;
200            persistent_states
201                .store_queue_state(mc_seqno, &block_handle, block)
202                .await?;
203        }
204
205        persistent_states
206            .store_queue_state(mc_seqno, &mc_block_handle, mc_block)
207            .await?;
208
209        Ok(())
210    }
211}
212
213struct StorePersistentStateTask {
214    mc_seqno: u32,
215    handle: Option<JoinHandle<Result<()>>>,
216}
217
218impl StorePersistentStateTask {
219    async fn join(&mut self) -> Result<()> {
220        // NOTE: Await on reference to make sure that the task is cancel safe
221        if let Some(handle) = &mut self.handle {
222            let result = handle
223                .await
224                .map_err(|e| {
225                    if e.is_panic() {
226                        std::panic::resume_unwind(e.into_panic());
227                    }
228                    anyhow::Error::from(e)
229                })
230                .and_then(std::convert::identity);
231
232            self.handle = None;
233
234            if let Err(e) = &result {
235                tracing::error!(
236                    mc_seqno = self.mc_seqno,
237                    "failed to save persistent state: {e:?}"
238                );
239            }
240
241            return result;
242        }
243
244        Ok(())
245    }
246
247    fn is_finished(&self) -> bool {
248        if let Some(handle) = &self.handle {
249            return handle.is_finished();
250        }
251
252        false
253    }
254}