tycho_core/block_strider/subscriber/
ps_subscriber.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4use anyhow::Result;
5use futures_util::future::BoxFuture;
6use tokio::task::JoinHandle;
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::RefMcStateHandle;
9
10use crate::block_strider::{StateSubscriber, StateSubscriberContext};
11use crate::storage::{BlockHandle, CoreStorage};
12
13#[derive(Clone)]
15pub struct PsSubscriber {
16 inner: Arc<Inner>,
17}
18
19impl PsSubscriber {
20 pub fn new(storage: CoreStorage) -> Self {
21 let last_key_block_utime = Self::find_last_key_block_utime(&storage);
22 Self {
23 inner: Arc::new(Inner {
24 last_key_block_utime: AtomicU32::new(last_key_block_utime),
25 storage,
26 prev_state_task: Default::default(),
27 }),
28 }
29 }
30
31 fn find_last_key_block_utime(storage: &CoreStorage) -> u32 {
32 storage
33 .block_handle_storage()
34 .find_last_key_block()
35 .map_or(0, |handle| handle.gen_utime())
36 }
37}
38
39impl StateSubscriber for PsSubscriber {
40 type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;
41
42 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
43 Box::pin(self.inner.handle_state_impl(cx))
44 }
45}
46
47struct Inner {
48 last_key_block_utime: AtomicU32,
49 storage: CoreStorage,
50 prev_state_task: tokio::sync::Mutex<Option<StorePersistentStateTask>>,
51}
52
53impl Inner {
54 async fn handle_state_impl(self: &Arc<Self>, cx: &StateSubscriberContext) -> Result<()> {
55 let mut prev_task = self.prev_state_task.lock().await;
58 if let Some(task) = &mut *prev_task
59 && task.is_finished()
60 {
61 task.join().await?;
62 }
63 drop(prev_task);
64
65 if cx.is_key_block {
66 let block_info = cx.block.load_info()?;
67
68 let prev_utime = self
69 .last_key_block_utime
70 .swap(block_info.gen_utime, Ordering::Relaxed);
71 let is_persistent = BlockStuff::compute_is_persistent(block_info.gen_utime, prev_utime);
72
73 if is_persistent && cx.block.id().seqno != 0 {
74 let mut prev_task = self.prev_state_task.lock().await;
75 if let Some(task) = &mut *prev_task {
76 task.join().await?;
77 }
78
79 let block = cx.block.clone();
80 let inner = self.clone();
81 let state_handle = cx.state.ref_mc_state_handle().clone();
82
83 *prev_task = Some(StorePersistentStateTask {
84 mc_seqno: cx.mc_block_id.seqno,
85 handle: Some(tokio::spawn(async move {
86 inner.save_impl(block, state_handle).await
87 })),
88 });
89 }
90 }
91
92 Ok(())
93 }
94
95 async fn save_impl(
96 &self,
97 mc_block: BlockStuff,
98 mc_state_handle: RefMcStateHandle,
99 ) -> Result<()> {
100 let block_handles = self.storage.block_handle_storage();
101
102 let Some(mc_block_handle) = block_handles.load_handle(mc_block.id()) else {
103 anyhow::bail!("masterchain block handle not found: {}", mc_block.id());
104 };
105 block_handles.set_block_persistent(&mc_block_handle);
106
107 let (state_result, queue_result) = tokio::join!(
108 self.save_persistent_shard_states(
109 mc_block_handle.clone(),
110 mc_block.clone(),
111 mc_state_handle
112 ),
113 self.save_persistent_queue_states(mc_block_handle.clone(), mc_block),
114 );
115 state_result?;
116 queue_result?;
117
118 self.storage
119 .persistent_state_storage()
120 .rotate_persistent_states(&mc_block_handle)
121 .await?;
122
123 metrics::counter!("tycho_core_ps_subscriber_saved_persistent_states_count").increment(1);
124
125 Ok(())
126 }
127
128 async fn save_persistent_shard_states(
129 &self,
130 mc_block_handle: BlockHandle,
131 mc_block: BlockStuff,
132 mc_state_handle: RefMcStateHandle,
133 ) -> Result<()> {
134 let block_handles = self.storage.block_handle_storage();
135 let persistent_states = self.storage.persistent_state_storage();
136
137 let mc_seqno = mc_block_handle.id().seqno;
138 for entry in mc_block.load_custom()?.shards.latest_blocks() {
139 let block_id = entry?;
140 let Some(block_handle) = block_handles.load_handle(&block_id) else {
141 anyhow::bail!("top shard block handle not found: {block_id}");
142 };
143
144 persistent_states
148 .store_shard_state(mc_seqno, &block_handle, mc_state_handle.clone())
149 .await?;
150 }
151
152 persistent_states
156 .store_shard_state(mc_seqno, &mc_block_handle, mc_state_handle)
157 .await?;
158
159 Ok(())
160 }
161
162 async fn save_persistent_queue_states(
163 &self,
164 mc_block_handle: BlockHandle,
165 mc_block: BlockStuff,
166 ) -> Result<()> {
167 if mc_block_handle.id().seqno == 0 {
168 return Ok(());
170 }
171
172 let blocks = self.storage.block_storage();
173 let block_handles = self.storage.block_handle_storage();
174 let persistent_states = self.storage.persistent_state_storage();
175
176 let mut shard_block_handles = Vec::new();
177
178 for entry in mc_block.load_custom()?.shards.latest_blocks() {
179 let block_id = entry?;
180 if block_id.seqno == 0 {
181 continue;
183 }
184
185 let Some(block_handle) = block_handles.load_handle(&block_id) else {
186 anyhow::bail!("top shard block handle not found: {block_id}");
187 };
188
189 block_handles.set_block_persistent(&block_handle);
192
193 shard_block_handles.push(block_handle);
194 }
195
196 let mc_seqno = mc_block_handle.id().seqno;
198 for block_handle in shard_block_handles {
199 let block = blocks.load_block_data(&block_handle).await?;
200 persistent_states
201 .store_queue_state(mc_seqno, &block_handle, block)
202 .await?;
203 }
204
205 persistent_states
206 .store_queue_state(mc_seqno, &mc_block_handle, mc_block)
207 .await?;
208
209 Ok(())
210 }
211}
212
213struct StorePersistentStateTask {
214 mc_seqno: u32,
215 handle: Option<JoinHandle<Result<()>>>,
216}
217
218impl StorePersistentStateTask {
219 async fn join(&mut self) -> Result<()> {
220 if let Some(handle) = &mut self.handle {
222 let result = handle
223 .await
224 .map_err(|e| {
225 if e.is_panic() {
226 std::panic::resume_unwind(e.into_panic());
227 }
228 anyhow::Error::from(e)
229 })
230 .and_then(std::convert::identity);
231
232 self.handle = None;
233
234 if let Err(e) = &result {
235 tracing::error!(
236 mc_seqno = self.mc_seqno,
237 "failed to save persistent state: {e:?}"
238 );
239 }
240
241 return result;
242 }
243
244 Ok(())
245 }
246
247 fn is_finished(&self) -> bool {
248 if let Some(handle) = &self.handle {
249 return handle.is_finished();
250 }
251
252 false
253 }
254}