1use super::*;
5use crate::blocks::TipsetKey;
6use crate::message::MessageRead as _;
7use crate::utils::ShallowClone as _;
8use ahash::{HashMap, HashMapExt as _};
9use anyhow::Context as _;
10use futures::{FutureExt, channel::oneshot, select};
11use tokio::sync::{RwLock, broadcast::error::RecvError};
12use tracing::warn;
13
14impl<DB> StateManager<DB>
15where
16 DB: Blockstore + Send + Sync + 'static,
17{
18 fn tipset_executed_message(
21 &self,
22 tipset: &Tipset,
23 message: &ChainMessage,
24 allow_replaced: bool,
25 ) -> Result<Option<Receipt>, Error> {
26 if tipset.epoch() == 0 {
27 return Ok(None);
28 }
29 let message_from_address = message.from();
30 let message_sequence = message.sequence();
31 let pts = self
33 .chain_index()
34 .load_required_tipset(tipset.parents())
35 .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
36 let messages = self
37 .cs
38 .messages_for_tipset(&pts)
39 .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
40 messages
41 .iter()
42 .enumerate()
43 .rev()
45 .filter(|(_, s)| {
46 s.sequence() == message_sequence
47 && s.from() == message_from_address
48 && s.equal_call(message)
49 })
50 .map(|(index, m)| {
51 if !allow_replaced && message.cid() != m.cid(){
55 Err(Error::Other(format!(
56 "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
57 message.cid(),
58 m.cid(),
59 message.sequence(),
60 message.from(),
61 )))
62 } else {
63 let block_header = tipset.block_headers().first();
64 crate::chain::get_parent_receipt(
65 self.blockstore(),
66 block_header,
67 index,
68 )
69 .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
70 }
71 })
72 .next()
73 .unwrap_or(Ok(None))
74 }
75
76 fn check_search(
77 &self,
78 mut current: Tipset,
79 message: &ChainMessage,
80 lookback_max_epoch: ChainEpoch,
81 allow_replaced: bool,
82 ) -> Result<Option<(Tipset, Receipt)>, Error> {
83 let message_from_address = message.from();
84 let message_sequence = message.sequence();
85 let mut current_actor_state = self
86 .get_required_actor(&message_from_address, *current.parent_state())
87 .map_err(Error::state)?;
88 let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?;
89
90 while current.epoch() >= lookback_max_epoch {
91 let parent_tipset = self
92 .chain_index()
93 .load_required_tipset(current.parents())
94 .map_err(|err| {
95 Error::Other(format!(
96 "failed to load tipset during msg wait searchback: {err:}"
97 ))
98 })?;
99
100 let parent_actor_state = self
101 .get_actor(&message_from_id, *parent_tipset.parent_state())
102 .map_err(|e| Error::State(e.to_string()))?;
103
104 if parent_actor_state.is_none()
105 || (current_actor_state.sequence > message_sequence
106 && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
107 {
108 let receipt = self
109 .tipset_executed_message(¤t, message, allow_replaced)?
110 .context("Failed to get receipt with tipset_executed_message")?;
111 return Ok(Some((current, receipt)));
112 }
113
114 if let Some(parent_actor_state) = parent_actor_state {
115 current = parent_tipset;
116 current_actor_state = parent_actor_state;
117 } else {
118 break;
119 }
120 }
121
122 Ok(None)
123 }
124
125 fn search_back_for_message(
127 &self,
128 current: Tipset,
129 message: &ChainMessage,
130 look_back_limit: Option<i64>,
131 allow_replaced: Option<bool>,
132 ) -> Result<Option<(Tipset, Receipt)>, Error> {
133 let current_epoch = current.epoch();
134 let allow_replaced = allow_replaced.unwrap_or(true);
135
136 let lookback_max_epoch = match look_back_limit {
138 Some(0) => return Ok(None),
140 Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
144 _ => 0,
146 };
147
148 self.check_search(current, message, lookback_max_epoch, allow_replaced)
149 }
150
151 pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
153 let m = crate::chain::get_chain_message(self.blockstore(), &msg)
154 .map_err(|e| Error::Other(e.to_string()))?;
155 let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
156 if let Some(receipt) = message_receipt {
157 return Ok(receipt);
158 }
159
160 let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
161 let message_receipt = maybe_tuple
162 .ok_or_else(|| {
163 Error::Other("Could not get receipt from search back message".to_string())
164 })?
165 .1;
166 Ok(message_receipt)
167 }
168
169 pub async fn wait_for_message(
174 self: &Arc<Self>,
175 msg_cid: Cid,
176 confidence: i64,
177 look_back_limit: Option<ChainEpoch>,
178 allow_replaced: Option<bool>,
179 ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
180 let mut head_changes_rx = self.cs.subscribe_head_changes();
181 let (sender, mut receiver) = oneshot::channel::<()>();
182 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
183 .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
184 let current_tipset = self.heaviest_tipset();
185 let maybe_message_receipt =
186 self.tipset_executed_message(¤t_tipset, &message, true)?;
187 if let Some(r) = maybe_message_receipt {
188 return Ok((Some(current_tipset.shallow_clone()), Some(r)));
189 }
190
191 let mut candidate_tipset: Option<Tipset> = None;
192 let mut candidate_receipt: Option<Receipt> = None;
193
194 let sm_cloned = self.shallow_clone();
195
196 let message_for_task = message.clone();
197 let height_of_head = current_tipset.epoch();
198 let task = tokio::task::spawn(async move {
199 let back_tuple = sm_cloned.search_back_for_message(
200 current_tipset,
201 &message_for_task,
202 look_back_limit,
203 allow_replaced,
204 )?;
205 sender
206 .send(())
207 .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
208 Ok::<_, Error>(back_tuple)
209 });
210
211 let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
212 let block_revert = reverts.clone();
213 let sm_cloned = Arc::clone(self);
214
215 let mut subscriber_poll = tokio::task::spawn(async move {
217 loop {
218 match head_changes_rx.recv().await {
219 Ok(head_changes) => {
220 for tipset in head_changes.reverts {
221 if candidate_tipset
222 .as_ref()
223 .is_some_and(|candidate| candidate.key() == tipset.key())
224 {
225 candidate_tipset = None;
226 candidate_receipt = None;
227 }
228 }
229 for tipset in head_changes.applies {
230 if candidate_tipset
231 .as_ref()
232 .map(|s| tipset.epoch() >= s.epoch() + confidence)
233 .unwrap_or_default()
234 {
235 return Ok((candidate_tipset, candidate_receipt));
236 }
237 let poll_receiver = receiver.try_recv();
238 if let Ok(Some(_)) = poll_receiver {
239 block_revert
240 .write()
241 .await
242 .insert(tipset.key().to_owned(), true);
243 }
244
245 let maybe_receipt =
246 sm_cloned.tipset_executed_message(&tipset, &message, true)?;
247 if let Some(receipt) = maybe_receipt {
248 if confidence == 0 {
249 return Ok((Some(tipset), Some(receipt)));
250 }
251 candidate_tipset = Some(tipset);
252 candidate_receipt = Some(receipt)
253 }
254 }
255 }
256 Err(RecvError::Lagged(i)) => {
257 warn!(
258 "wait for message head change subscriber lagged, skipped {} events",
259 i
260 );
261 }
262 Err(RecvError::Closed) => break,
263 }
264 }
265 Ok((None, None))
266 })
267 .fuse();
268
269 let mut search_back_poll = tokio::task::spawn(async move {
271 let back_tuple = task.await.map_err(|e| {
272 Error::Other(format!("Could not search backwards for message {e}"))
273 })??;
274 if let Some((back_tipset, back_receipt)) = back_tuple {
275 let should_revert = *reverts
276 .read()
277 .await
278 .get(back_tipset.key())
279 .unwrap_or(&false);
280 let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
281 if !should_revert && larger_height_of_head {
282 return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
283 }
284 return Ok((None, None));
285 }
286 Ok((None, None))
287 })
288 .fuse();
289
290 loop {
292 select! {
293 res = subscriber_poll => {
294 return res?
295 }
296 res = search_back_poll => {
297 if let Ok((Some(ts), Some(rct))) = res? {
298 return Ok((Some(ts), Some(rct)));
299 }
300 }
301 }
302 }
303 }
304
305 pub async fn search_for_message(
306 &self,
307 from: Option<Tipset>,
308 msg_cid: Cid,
309 look_back_limit: Option<i64>,
310 allow_replaced: Option<bool>,
311 ) -> Result<Option<(Tipset, Receipt)>, Error> {
312 let from = from.unwrap_or_else(|| self.heaviest_tipset());
313 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
314 .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
315 let current_tipset = self.heaviest_tipset();
316 let maybe_message_receipt =
317 self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
318 if let Some(r) = maybe_message_receipt {
319 Ok(Some((from, r)))
320 } else {
321 self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
322 }
323 }
324}