tycho_core/block_strider/subscriber/
ps_subscriber.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4use anyhow::Result;
5use tycho_block_util::block::BlockStuff;
6use tycho_block_util::state::RefMcStateHandle;
7
8use crate::block_strider::{StateSubscriber, StateSubscriberContext};
9use crate::storage::{BlockHandle, CoreStorage};
10
11#[derive(Clone)]
13pub struct PsSubscriber {
14 inner: Arc<Inner>,
15}
16
17impl PsSubscriber {
18 pub fn new(storage: CoreStorage) -> Self {
19 let last_key_block_utime = storage
20 .block_handle_storage()
21 .find_last_key_block()
22 .map_or(0, |handle| handle.gen_utime());
23
24 Self {
25 inner: Arc::new(Inner {
26 last_key_block_utime: AtomicU32::new(last_key_block_utime),
27 storage,
28 }),
29 }
30 }
31}
32
33impl StateSubscriber for PsSubscriber {
34 type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
35
36 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
37 if cx.is_key_block {
38 let block_info = cx.block.load_info().unwrap();
39
40 let prev_utime = self
41 .inner
42 .last_key_block_utime
43 .swap(block_info.gen_utime, Ordering::Relaxed);
44 let is_persistent = BlockStuff::compute_is_persistent(block_info.gen_utime, prev_utime);
45
46 if is_persistent && cx.block.id().seqno != 0 {
47 let block = cx.block.clone();
48 let inner = self.inner.clone();
49 let state_handle = cx.state.ref_mc_state_handle().clone();
50 tokio::spawn(async move {
51 if let Err(e) = inner.save_impl(block, state_handle).await {
52 tracing::error!("failed to save persistent states: {e}");
53 }
54 });
55 }
56 }
57
58 futures_util::future::ready(Ok(()))
59 }
60}
61
62struct Inner {
63 last_key_block_utime: AtomicU32,
64 storage: CoreStorage,
65}
66
67impl Inner {
68 async fn save_impl(
69 &self,
70 mc_block: BlockStuff,
71 mc_state_handle: RefMcStateHandle,
72 ) -> Result<()> {
73 let block_handles = self.storage.block_handle_storage();
74
75 let Some(mc_block_handle) = block_handles.load_handle(mc_block.id()) else {
76 anyhow::bail!("masterchain block handle not found: {}", mc_block.id());
77 };
78 block_handles.set_block_persistent(&mc_block_handle);
79
80 let (state_result, queue_result) = tokio::join!(
81 self.save_persistent_shard_states(
82 mc_block_handle.clone(),
83 mc_block.clone(),
84 mc_state_handle
85 ),
86 self.save_persistent_queue_states(mc_block_handle.clone(), mc_block),
87 );
88 state_result?;
89 queue_result?;
90
91 self.storage
92 .persistent_state_storage()
93 .rotate_persistent_states(&mc_block_handle)
94 .await?;
95
96 metrics::counter!("tycho_core_ps_subscriber_saved_persistent_states_count").increment(1);
97
98 Ok(())
99 }
100
101 async fn save_persistent_shard_states(
102 &self,
103 mc_block_handle: BlockHandle,
104 mc_block: BlockStuff,
105 mc_state_handle: RefMcStateHandle,
106 ) -> Result<()> {
107 let block_handles = self.storage.block_handle_storage();
108 let persistent_states = self.storage.persistent_state_storage();
109
110 let mc_seqno = mc_block_handle.id().seqno;
111 for entry in mc_block.load_custom()?.shards.latest_blocks() {
112 let block_id = entry?;
113 let Some(block_handle) = block_handles.load_handle(&block_id) else {
114 anyhow::bail!("top shard block handle not found: {block_id}");
115 };
116
117 persistent_states
121 .store_shard_state(mc_seqno, &block_handle, mc_state_handle.clone())
122 .await?;
123 }
124
125 persistent_states
129 .store_shard_state(mc_seqno, &mc_block_handle, mc_state_handle)
130 .await
131 }
132
133 async fn save_persistent_queue_states(
134 &self,
135 mc_block_handle: BlockHandle,
136 mc_block: BlockStuff,
137 ) -> Result<()> {
138 if mc_block_handle.id().seqno == 0 {
139 return Ok(());
141 }
142
143 let blocks = self.storage.block_storage();
144 let block_handles = self.storage.block_handle_storage();
145 let persistent_states = self.storage.persistent_state_storage();
146
147 let mut shard_block_handles = Vec::new();
148
149 for entry in mc_block.load_custom()?.shards.latest_blocks() {
150 let block_id = entry?;
151 if block_id.seqno == 0 {
152 continue;
154 }
155
156 let Some(block_handle) = block_handles.load_handle(&block_id) else {
157 anyhow::bail!("top shard block handle not found: {block_id}");
158 };
159
160 block_handles.set_block_persistent(&block_handle);
163
164 shard_block_handles.push(block_handle);
165 }
166
167 let mc_seqno = mc_block_handle.id().seqno;
169 for block_handle in shard_block_handles {
170 let block = blocks.load_block_data(&block_handle).await?;
171 persistent_states
172 .store_queue_state(mc_seqno, &block_handle, block)
173 .await?;
174 }
175
176 persistent_states
177 .store_queue_state(mc_seqno, &mc_block_handle, mc_block)
178 .await
179 }
180}