Skip to main content

forest/state_manager/
message_search.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::*;
5use crate::blocks::TipsetKey;
6use crate::message::MessageRead as _;
7use crate::utils::ShallowClone as _;
8use ahash::{HashMap, HashMapExt as _};
9use anyhow::Context as _;
10use futures::{FutureExt, channel::oneshot, select};
11use tokio::sync::{RwLock, broadcast::error::RecvError};
12use tracing::warn;
13
14impl<DB> StateManager<DB>
15where
16    DB: Blockstore + Send + Sync + 'static,
17{
18    /// Check if tipset had executed the message, by loading the receipt based
19    /// on the index of the message in the block.
20    fn tipset_executed_message(
21        &self,
22        tipset: &Tipset,
23        message: &ChainMessage,
24        allow_replaced: bool,
25    ) -> Result<Option<Receipt>, Error> {
26        if tipset.epoch() == 0 {
27            return Ok(None);
28        }
29        let message_from_address = message.from();
30        let message_sequence = message.sequence();
31        // Load parent state.
32        let pts = self
33            .chain_index()
34            .load_required_tipset(tipset.parents())
35            .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
36        let messages = self
37            .cs
38            .messages_for_tipset(&pts)
39            .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
40        messages
41            .iter()
42            .enumerate()
43            // iterate in reverse because we going backwards through the chain
44            .rev()
45            .filter(|(_, s)| {
46                s.sequence() == message_sequence
47                    && s.from() == message_from_address
48                    && s.equal_call(message)
49            })
50            .map(|(index, m)| {
51                // A replacing message is a message with a different CID,
52                // any of Gas values, and different signature, but with all
53                // other parameters matching (source/destination, nonce, params, etc.)
54                if !allow_replaced && message.cid() != m.cid(){
55                    Err(Error::Other(format!(
56                        "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
57                        message.cid(),
58                        m.cid(),
59                        message.sequence(),
60                        message.from(),
61                    )))
62                } else {
63                    let block_header = tipset.block_headers().first();
64                    crate::chain::get_parent_receipt(
65                        self.blockstore(),
66                        block_header,
67                        index,
68                    )
69                        .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
70                }
71            })
72            .next()
73            .unwrap_or(Ok(None))
74    }
75
76    fn check_search(
77        &self,
78        mut current: Tipset,
79        message: &ChainMessage,
80        lookback_max_epoch: ChainEpoch,
81        allow_replaced: bool,
82    ) -> Result<Option<(Tipset, Receipt)>, Error> {
83        let message_from_address = message.from();
84        let message_sequence = message.sequence();
85        let mut current_actor_state = self
86            .get_required_actor(&message_from_address, *current.parent_state())
87            .map_err(Error::state)?;
88        let message_from_id = self.lookup_required_id(&message_from_address, &current)?;
89
90        while current.epoch() >= lookback_max_epoch {
91            let parent_tipset = self
92                .chain_index()
93                .load_required_tipset(current.parents())
94                .map_err(|err| {
95                    Error::Other(format!(
96                        "failed to load tipset during msg wait searchback: {err:}"
97                    ))
98                })?;
99
100            let parent_actor_state = self
101                .get_actor(&message_from_id, *parent_tipset.parent_state())
102                .map_err(|e| Error::State(e.to_string()))?;
103
104            if parent_actor_state.is_none()
105                || (current_actor_state.sequence > message_sequence
106                    && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
107            {
108                let receipt = self
109                    .tipset_executed_message(&current, message, allow_replaced)?
110                    .context("Failed to get receipt with tipset_executed_message")?;
111                return Ok(Some((current, receipt)));
112            }
113
114            if let Some(parent_actor_state) = parent_actor_state {
115                current = parent_tipset;
116                current_actor_state = parent_actor_state;
117            } else {
118                break;
119            }
120        }
121
122        Ok(None)
123    }
124
125    /// Searches backwards through the chain for a message receipt.
126    fn search_back_for_message(
127        &self,
128        current: Tipset,
129        message: &ChainMessage,
130        look_back_limit: Option<i64>,
131        allow_replaced: Option<bool>,
132    ) -> Result<Option<(Tipset, Receipt)>, Error> {
133        let current_epoch = current.epoch();
134        let allow_replaced = allow_replaced.unwrap_or(true);
135
136        // Calculate the max lookback epoch (inclusive lower bound) for the search.
137        let lookback_max_epoch = match look_back_limit {
138            // No search: limit = 0 means search 0 epochs
139            Some(0) => return Ok(None),
140            // Limited search: calculate the inclusive lower bound, clamped to genesis
141            // Example: limit=5 at epoch=1000 → min_epoch=996, searches [996,1000] = 5 epochs
142            // Example: limit=2000 at epoch=1000 → min_epoch=0, searches [0,1000] = 1001 epochs (all available)
143            Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
144            // Search all the way to genesis (epoch 0)
145            _ => 0,
146        };
147
148        self.check_search(current, message, lookback_max_epoch, allow_replaced)
149    }
150
151    /// Returns a message receipt from a given tipset and message CID.
152    pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
153        let m = crate::chain::get_chain_message(self.blockstore(), &msg)
154            .map_err(|e| Error::Other(e.to_string()))?;
155        let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
156        if let Some(receipt) = message_receipt {
157            return Ok(receipt);
158        }
159
160        let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
161        let message_receipt = maybe_tuple
162            .ok_or_else(|| {
163                Error::Other("Could not get receipt from search back message".to_string())
164            })?
165            .1;
166        Ok(message_receipt)
167    }
168
169    /// `WaitForMessage` blocks until a message appears on chain. It looks
170    /// backwards in the chain to see if this has already happened. It
171    /// guarantees that the message has been on chain for at least
172    /// confidence epochs without being reverted before returning.
173    pub async fn wait_for_message(
174        self: &Arc<Self>,
175        msg_cid: Cid,
176        confidence: i64,
177        look_back_limit: Option<ChainEpoch>,
178        allow_replaced: Option<bool>,
179    ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
180        let mut head_changes_rx = self.cs.subscribe_head_changes();
181        let (sender, mut receiver) = oneshot::channel::<()>();
182        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
183            .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
184        let current_tipset = self.heaviest_tipset();
185        let maybe_message_receipt =
186            self.tipset_executed_message(&current_tipset, &message, true)?;
187        if let Some(r) = maybe_message_receipt {
188            return Ok((Some(current_tipset.shallow_clone()), Some(r)));
189        }
190
191        let mut candidate_tipset: Option<Tipset> = None;
192        let mut candidate_receipt: Option<Receipt> = None;
193
194        let sm_cloned = self.shallow_clone();
195
196        let message_for_task = message.clone();
197        let height_of_head = current_tipset.epoch();
198        let task = tokio::task::spawn(async move {
199            let back_tuple = sm_cloned.search_back_for_message(
200                current_tipset,
201                &message_for_task,
202                look_back_limit,
203                allow_replaced,
204            )?;
205            sender
206                .send(())
207                .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
208            Ok::<_, Error>(back_tuple)
209        });
210
211        let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
212        let block_revert = reverts.clone();
213        let sm_cloned = Arc::clone(self);
214
215        // Wait for message to be included in head change.
216        let mut subscriber_poll = tokio::task::spawn(async move {
217            loop {
218                match head_changes_rx.recv().await {
219                    Ok(head_changes) => {
220                        for tipset in head_changes.reverts {
221                            if candidate_tipset
222                                .as_ref()
223                                .is_some_and(|candidate| candidate.key() == tipset.key())
224                            {
225                                candidate_tipset = None;
226                                candidate_receipt = None;
227                            }
228                        }
229                        for tipset in head_changes.applies {
230                            if candidate_tipset
231                                .as_ref()
232                                .map(|s| tipset.epoch() >= s.epoch() + confidence)
233                                .unwrap_or_default()
234                            {
235                                return Ok((candidate_tipset, candidate_receipt));
236                            }
237                            let poll_receiver = receiver.try_recv();
238                            if let Ok(Some(_)) = poll_receiver {
239                                block_revert
240                                    .write()
241                                    .await
242                                    .insert(tipset.key().to_owned(), true);
243                            }
244
245                            let maybe_receipt =
246                                sm_cloned.tipset_executed_message(&tipset, &message, true)?;
247                            if let Some(receipt) = maybe_receipt {
248                                if confidence == 0 {
249                                    return Ok((Some(tipset), Some(receipt)));
250                                }
251                                candidate_tipset = Some(tipset);
252                                candidate_receipt = Some(receipt)
253                            }
254                        }
255                    }
256                    Err(RecvError::Lagged(i)) => {
257                        warn!(
258                            "wait for message head change subscriber lagged, skipped {} events",
259                            i
260                        );
261                    }
262                    Err(RecvError::Closed) => break,
263                }
264            }
265            Ok((None, None))
266        })
267        .fuse();
268
269        // Search backwards for message.
270        let mut search_back_poll = tokio::task::spawn(async move {
271            let back_tuple = task.await.map_err(|e| {
272                Error::Other(format!("Could not search backwards for message {e}"))
273            })??;
274            if let Some((back_tipset, back_receipt)) = back_tuple {
275                let should_revert = *reverts
276                    .read()
277                    .await
278                    .get(back_tipset.key())
279                    .unwrap_or(&false);
280                let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
281                if !should_revert && larger_height_of_head {
282                    return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
283                }
284                return Ok((None, None));
285            }
286            Ok((None, None))
287        })
288        .fuse();
289
290        // Await on first future to finish.
291        loop {
292            select! {
293                res = subscriber_poll => {
294                    return res?
295                }
296                res = search_back_poll => {
297                    if let Ok((Some(ts), Some(rct))) = res? {
298                        return Ok((Some(ts), Some(rct)));
299                    }
300                }
301            }
302        }
303    }
304
305    pub async fn search_for_message(
306        &self,
307        from: Option<Tipset>,
308        msg_cid: Cid,
309        look_back_limit: Option<i64>,
310        allow_replaced: Option<bool>,
311    ) -> Result<Option<(Tipset, Receipt)>, Error> {
312        let from = from.unwrap_or_else(|| self.heaviest_tipset());
313        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
314            .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
315        let current_tipset = self.heaviest_tipset();
316        let maybe_message_receipt =
317            self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
318        if let Some(r) = maybe_message_receipt {
319            Ok(Some((from, r)))
320        } else {
321            self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
322        }
323    }
324}