kona_interop/
graph.rs

1//! Interop [MessageGraph].
2
3use crate::{
4    RawMessagePayload,
5    errors::{MessageGraphError, MessageGraphResult},
6    message::{EnrichedExecutingMessage, extract_executing_messages},
7    traits::InteropProvider,
8};
9use alloc::vec::Vec;
10use alloy_consensus::{Header, Sealed};
11use alloy_primitives::{hex, keccak256, map::HashMap};
12use kona_genesis::RollupConfig;
13use kona_registry::ROLLUP_CONFIGS;
14use tracing::{info, warn};
15
16/// The message graph represents a set of blocks at a given timestamp and the interop
17/// dependencies between them.
18///
19/// This structure is used to determine whether or not any interop messages are invalid within the
20/// set of blocks within the graph. An "invalid message" is one that was relayed from one chain to
21/// another, but the original [MessageIdentifier] is not present within the graph or from a
22/// dependency referenced via the [InteropProvider] (or otherwise is invalid, such as being older
23/// than the message expiry window).
24///
25/// Message validity rules: <https://specs.optimism.io/interop/messaging.html#invalid-messages>
26///
27/// [MessageIdentifier]: crate::MessageIdentifier
28#[derive(Debug)]
29pub struct MessageGraph<'a, P> {
30    /// The edges within the graph.
31    ///
32    /// These are derived from the transactions within the blocks.
33    messages: Vec<EnrichedExecutingMessage>,
34    /// The data provider for the graph. Required for fetching headers, receipts and remote
35    /// messages within history during resolution.
36    provider: &'a P,
37    /// Backup rollup configs for each chain.
38    rollup_configs: &'a HashMap<u64, RollupConfig>,
39}
40
41impl<'a, P> MessageGraph<'a, P>
42where
43    P: InteropProvider,
44{
45    /// Derives the edges from the blocks within the graph by scanning all receipts within the
46    /// blocks and searching for [ExecutingMessage]s.
47    ///
48    /// [ExecutingMessage]: crate::ExecutingMessage
49    pub async fn derive(
50        blocks: &[(u64, Sealed<Header>)],
51        provider: &'a P,
52        rollup_configs: &'a HashMap<u64, RollupConfig>,
53    ) -> MessageGraphResult<Self, P> {
54        info!(
55            target: "message-graph",
56            "Deriving message graph from {} blocks.",
57            blocks.len()
58        );
59
60        let mut messages = Vec::with_capacity(blocks.len());
61        for (chain_id, header) in blocks.iter() {
62            let receipts = provider.receipts_by_hash(*chain_id, header.hash()).await?;
63            let executing_messages = extract_executing_messages(receipts.as_slice());
64
65            messages.extend(executing_messages.into_iter().map(|message| {
66                EnrichedExecutingMessage::new(message, *chain_id, header.timestamp)
67            }));
68        }
69
70        info!(
71            target: "message-graph",
72            "Derived {} executing messages from {} blocks.",
73            messages.len(),
74            blocks.len()
75        );
76        Ok(Self { messages, provider, rollup_configs })
77    }
78
79    /// Checks the validity of all messages within the graph.
80    pub async fn resolve(mut self) -> MessageGraphResult<(), P> {
81        info!(
82            target: "message-graph",
83            "Checking the message graph for invalid messages."
84        );
85
86        // Reduce the graph to remove all valid messages.
87        self.reduce().await?;
88
89        // Check if the graph is now empty. If not, there are invalid messages.
90        if !self.messages.is_empty() {
91            // Collect the chain IDs for all blocks containing invalid messages.
92            let mut bad_block_chain_ids =
93                self.messages.into_iter().map(|e| e.executing_chain_id).collect::<Vec<_>>();
94            bad_block_chain_ids.dedup_by(|a, b| a == b);
95
96            warn!(
97                target: "message-graph",
98                "Failed to reduce the message graph entirely. Invalid messages found in chains {}",
99                bad_block_chain_ids
100                    .iter()
101                    .map(|id| alloc::format!("{}", id))
102                    .collect::<Vec<_>>()
103                    .join(", ")
104            );
105
106            // Return an error with the chain IDs of the blocks containing invalid messages.
107            return Err(MessageGraphError::InvalidMessages(bad_block_chain_ids));
108        }
109
110        Ok(())
111    }
112
113    /// Attempts to remove as many edges from the graph as possible by resolving the dependencies
114    /// of each message. If a message cannot be resolved, it is considered invalid. After this
115    /// function is called, any outstanding messages are invalid.
116    async fn reduce(&mut self) -> MessageGraphResult<(), P> {
117        // Create a new vector to store invalid edges
118        let mut invalid_messages = Vec::with_capacity(self.messages.len());
119
120        // Prune all valid edges.
121        for message in core::mem::take(&mut self.messages) {
122            if let Err(e) = self.check_single_dependency(&message).await {
123                warn!(
124                    target: "message-graph",
125                    "Invalid ExecutingMessage found - relayed on chain {} with message hash {}.",
126                    message.executing_chain_id,
127                    hex::encode(message.inner.payloadHash)
128                );
129                warn!("Invalid message error: {}", e);
130                invalid_messages.push(message);
131            }
132        }
133
134        info!(
135            target: "message-graph",
136            "Successfully reduced the message graph. {} invalid messages found.",
137            invalid_messages.len()
138        );
139
140        // Replace the old edges with the filtered list
141        self.messages = invalid_messages;
142
143        Ok(())
144    }
145
146    /// Checks the dependency of a single [EnrichedExecutingMessage]. If the message's dependencies
147    /// are unavailable, the message is considered invalid and an [Err] is returned.
148    async fn check_single_dependency(
149        &self,
150        message: &EnrichedExecutingMessage,
151    ) -> MessageGraphResult<(), P> {
152        // ChainID Invariant: The chain id of the initiating message MUST be in the dependency set
153        // This is enforced implicitly by the graph constructor and the provider.
154
155        let initiating_chain_id = message.inner.identifier.chainId.saturating_to();
156        let initiating_timestamp = message.inner.identifier.timestamp.saturating_to::<u64>();
157
158        // Attempt to fetch the rollup config for the initiating chain from the registry. If the
159        // rollup config is not found, fall back to the local rollup configs.
160        let rollup_config = ROLLUP_CONFIGS
161            .get(&initiating_chain_id)
162            .or_else(|| self.rollup_configs.get(&initiating_chain_id))
163            .ok_or(MessageGraphError::MissingRollupConfig(initiating_chain_id))?;
164
165        // Timestamp invariant: The timestamp at the time of inclusion of the initiating message
166        // MUST be less than or equal to the timestamp of the executing message as well as greater
167        // than or equal to the Interop Start Timestamp.
168        if initiating_timestamp > message.executing_timestamp {
169            return Err(MessageGraphError::MessageInFuture(
170                message.executing_timestamp,
171                initiating_timestamp,
172            ));
173        } else if initiating_timestamp < rollup_config.hardforks.interop_time.unwrap_or_default() {
174            return Err(MessageGraphError::InvalidMessageTimestamp(
175                rollup_config.hardforks.interop_time.unwrap_or_default(),
176                initiating_timestamp,
177            ));
178        }
179
180        // Fetch the header & receipts for the message's claimed origin block on the remote chain.
181        let remote_header = self
182            .provider
183            .header_by_number(
184                message.inner.identifier.chainId.saturating_to(),
185                message.inner.identifier.blockNumber.saturating_to(),
186            )
187            .await?;
188        let remote_receipts = self
189            .provider
190            .receipts_by_number(
191                message.inner.identifier.chainId.saturating_to(),
192                message.inner.identifier.blockNumber.saturating_to(),
193            )
194            .await?;
195
196        // Find the log that matches the message's claimed log index. Note that the
197        // log index is global to the block, so we chain the full block's logs together
198        // to find it.
199        let remote_log = remote_receipts
200            .iter()
201            .flat_map(|receipt| receipt.logs())
202            .nth(message.inner.identifier.logIndex.saturating_to())
203            .ok_or(MessageGraphError::RemoteMessageNotFound(
204                message.inner.identifier.chainId.to(),
205                message.inner.payloadHash,
206            ))?;
207
208        // Validate the message's origin is correct.
209        if remote_log.address != message.inner.identifier.origin {
210            return Err(MessageGraphError::InvalidMessageOrigin(
211                message.inner.identifier.origin,
212                remote_log.address,
213            ));
214        }
215
216        // Validate that the message hash is correct.
217        let remote_message = RawMessagePayload::from(remote_log);
218        let remote_message_hash = keccak256(remote_message.as_ref());
219        if remote_message_hash != message.inner.payloadHash {
220            return Err(MessageGraphError::InvalidMessageHash(
221                message.inner.payloadHash,
222                remote_message_hash,
223            ));
224        }
225
226        // Validate that the timestamp of the block header containing the log is correct.
227        if remote_header.timestamp != initiating_timestamp {
228            return Err(MessageGraphError::InvalidMessageTimestamp(
229                initiating_timestamp,
230                remote_header.timestamp,
231            ));
232        }
233
234        Ok(())
235    }
236}
237
238#[cfg(test)]
239mod test {
240    use super::MessageGraph;
241    use crate::{MessageGraphError, test_util::SuperchainBuilder};
242    use alloy_primitives::{Address, hex, keccak256, map::HashMap};
243    use kona_genesis::{HardForkConfig, RollupConfig};
244
245    const MESSAGE: [u8; 4] = hex!("deadbeef");
246    const OP_CHAIN_ID: u64 = 10;
247    const BASE_CHAIN_ID: u64 = 8453;
248
249    #[tokio::test]
250    async fn test_derive_and_reduce_simple_graph() {
251        let mut superchain = SuperchainBuilder::new(0);
252
253        superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
254        superchain.chain(BASE_CHAIN_ID).add_executing_message(
255            keccak256(MESSAGE),
256            0,
257            OP_CHAIN_ID,
258            0,
259        );
260
261        let (headers, provider) = superchain.build();
262
263        let cfgs = HashMap::default();
264        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
265        graph.resolve().await.unwrap();
266    }
267
268    #[tokio::test]
269    async fn test_derive_and_reduce_cyclical_graph() {
270        let mut superchain = SuperchainBuilder::new(0);
271
272        superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into()).add_executing_message(
273            keccak256(MESSAGE),
274            1,
275            BASE_CHAIN_ID,
276            0,
277        );
278        superchain
279            .chain(BASE_CHAIN_ID)
280            .add_executing_message(keccak256(MESSAGE), 0, OP_CHAIN_ID, 0)
281            .add_initiating_message(MESSAGE.into());
282
283        let (headers, provider) = superchain.build();
284
285        let cfgs = HashMap::default();
286        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
287        graph.resolve().await.unwrap();
288    }
289
290    #[tokio::test]
291    async fn test_derive_and_reduce_simple_graph_remote_message_not_found() {
292        let mut superchain = SuperchainBuilder::new(0);
293
294        superchain.chain(OP_CHAIN_ID);
295        superchain.chain(BASE_CHAIN_ID).add_executing_message(
296            keccak256(MESSAGE),
297            0,
298            OP_CHAIN_ID,
299            0,
300        );
301
302        let (headers, provider) = superchain.build();
303
304        let cfgs = HashMap::default();
305        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
306        assert_eq!(
307            graph.resolve().await.unwrap_err(),
308            MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
309        );
310    }
311
312    #[tokio::test]
313    async fn test_derive_and_reduce_simple_graph_invalid_chain_id() {
314        let mut superchain = SuperchainBuilder::new(0);
315
316        superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
317        superchain.chain(BASE_CHAIN_ID).add_executing_message(
318            keccak256(MESSAGE),
319            0,
320            BASE_CHAIN_ID,
321            0,
322        );
323
324        let (headers, provider) = superchain.build();
325
326        let cfgs = HashMap::default();
327        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
328        assert_eq!(
329            graph.resolve().await.unwrap_err(),
330            MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
331        );
332    }
333
334    #[tokio::test]
335    async fn test_derive_and_reduce_simple_graph_message_before_interop_activation() {
336        let mut superchain = SuperchainBuilder::new(0);
337
338        superchain.chain(0xDEAD).add_initiating_message(MESSAGE.into());
339        superchain.chain(BASE_CHAIN_ID).add_executing_message(keccak256(MESSAGE), 0, 0xDEAD, 0);
340
341        let (headers, provider) = superchain.build();
342
343        let mut cfgs = HashMap::default();
344        cfgs.insert(
345            0xDEAD,
346            RollupConfig {
347                hardforks: HardForkConfig { interop_time: Some(50), ..Default::default() },
348                ..Default::default()
349            },
350        );
351        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
352        assert_eq!(
353            graph.resolve().await.unwrap_err(),
354            MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
355        );
356    }
357
358    #[tokio::test]
359    async fn test_derive_and_reduce_simple_graph_invalid_log_index() {
360        let mut superchain = SuperchainBuilder::new(0);
361
362        superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
363        superchain.chain(BASE_CHAIN_ID).add_executing_message(
364            keccak256(MESSAGE),
365            1,
366            OP_CHAIN_ID,
367            0,
368        );
369
370        let (headers, provider) = superchain.build();
371
372        let cfgs = HashMap::default();
373        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
374        assert_eq!(
375            graph.resolve().await.unwrap_err(),
376            MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
377        );
378    }
379
380    #[tokio::test]
381    async fn test_derive_and_reduce_simple_graph_invalid_message_hash() {
382        let mut superchain = SuperchainBuilder::new(0);
383
384        superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
385        superchain.chain(BASE_CHAIN_ID).add_executing_message(
386            keccak256(hex!("0badc0de")),
387            0,
388            OP_CHAIN_ID,
389            0,
390        );
391
392        let (headers, provider) = superchain.build();
393
394        let cfgs = HashMap::default();
395        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
396        assert_eq!(
397            graph.resolve().await.unwrap_err(),
398            MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
399        );
400    }
401
402    #[tokio::test]
403    async fn test_derive_and_reduce_simple_graph_invalid_origin_address() {
404        let mut superchain = SuperchainBuilder::new(0);
405
406        superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
407        superchain.chain(BASE_CHAIN_ID).add_executing_message_with_origin(
408            keccak256(MESSAGE),
409            Address::left_padding_from(&[0x01]),
410            0,
411            OP_CHAIN_ID,
412            0,
413        );
414
415        let (headers, provider) = superchain.build();
416
417        let cfgs = HashMap::default();
418        let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
419        assert_eq!(
420            graph.resolve().await.unwrap_err(),
421            MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
422        );
423    }
424}