1use crate::{
4 RawMessagePayload,
5 errors::{MessageGraphError, MessageGraphResult},
6 message::{EnrichedExecutingMessage, extract_executing_messages},
7 traits::InteropProvider,
8};
9use alloc::vec::Vec;
10use alloy_consensus::{Header, Sealed};
11use alloy_primitives::{hex, keccak256, map::HashMap};
12use kona_genesis::RollupConfig;
13use kona_registry::ROLLUP_CONFIGS;
14use tracing::{info, warn};
15
16#[derive(Debug)]
29pub struct MessageGraph<'a, P> {
30 messages: Vec<EnrichedExecutingMessage>,
34 provider: &'a P,
37 rollup_configs: &'a HashMap<u64, RollupConfig>,
39}
40
41impl<'a, P> MessageGraph<'a, P>
42where
43 P: InteropProvider,
44{
45 pub async fn derive(
50 blocks: &[(u64, Sealed<Header>)],
51 provider: &'a P,
52 rollup_configs: &'a HashMap<u64, RollupConfig>,
53 ) -> MessageGraphResult<Self, P> {
54 info!(
55 target: "message-graph",
56 "Deriving message graph from {} blocks.",
57 blocks.len()
58 );
59
60 let mut messages = Vec::with_capacity(blocks.len());
61 for (chain_id, header) in blocks.iter() {
62 let receipts = provider.receipts_by_hash(*chain_id, header.hash()).await?;
63 let executing_messages = extract_executing_messages(receipts.as_slice());
64
65 messages.extend(executing_messages.into_iter().map(|message| {
66 EnrichedExecutingMessage::new(message, *chain_id, header.timestamp)
67 }));
68 }
69
70 info!(
71 target: "message-graph",
72 "Derived {} executing messages from {} blocks.",
73 messages.len(),
74 blocks.len()
75 );
76 Ok(Self { messages, provider, rollup_configs })
77 }
78
79 pub async fn resolve(mut self) -> MessageGraphResult<(), P> {
81 info!(
82 target: "message-graph",
83 "Checking the message graph for invalid messages."
84 );
85
86 self.reduce().await?;
88
89 if !self.messages.is_empty() {
91 let mut bad_block_chain_ids =
93 self.messages.into_iter().map(|e| e.executing_chain_id).collect::<Vec<_>>();
94 bad_block_chain_ids.dedup_by(|a, b| a == b);
95
96 warn!(
97 target: "message-graph",
98 "Failed to reduce the message graph entirely. Invalid messages found in chains {}",
99 bad_block_chain_ids
100 .iter()
101 .map(|id| alloc::format!("{}", id))
102 .collect::<Vec<_>>()
103 .join(", ")
104 );
105
106 return Err(MessageGraphError::InvalidMessages(bad_block_chain_ids));
108 }
109
110 Ok(())
111 }
112
113 async fn reduce(&mut self) -> MessageGraphResult<(), P> {
117 let mut invalid_messages = Vec::with_capacity(self.messages.len());
119
120 for message in core::mem::take(&mut self.messages) {
122 if let Err(e) = self.check_single_dependency(&message).await {
123 warn!(
124 target: "message-graph",
125 "Invalid ExecutingMessage found - relayed on chain {} with message hash {}.",
126 message.executing_chain_id,
127 hex::encode(message.inner.payloadHash)
128 );
129 warn!("Invalid message error: {}", e);
130 invalid_messages.push(message);
131 }
132 }
133
134 info!(
135 target: "message-graph",
136 "Successfully reduced the message graph. {} invalid messages found.",
137 invalid_messages.len()
138 );
139
140 self.messages = invalid_messages;
142
143 Ok(())
144 }
145
146 async fn check_single_dependency(
149 &self,
150 message: &EnrichedExecutingMessage,
151 ) -> MessageGraphResult<(), P> {
152 let initiating_chain_id = message.inner.identifier.chainId.saturating_to();
156 let initiating_timestamp = message.inner.identifier.timestamp.saturating_to::<u64>();
157
158 let rollup_config = ROLLUP_CONFIGS
161 .get(&initiating_chain_id)
162 .or_else(|| self.rollup_configs.get(&initiating_chain_id))
163 .ok_or(MessageGraphError::MissingRollupConfig(initiating_chain_id))?;
164
165 if initiating_timestamp > message.executing_timestamp {
169 return Err(MessageGraphError::MessageInFuture(
170 message.executing_timestamp,
171 initiating_timestamp,
172 ));
173 } else if initiating_timestamp < rollup_config.hardforks.interop_time.unwrap_or_default() {
174 return Err(MessageGraphError::InvalidMessageTimestamp(
175 rollup_config.hardforks.interop_time.unwrap_or_default(),
176 initiating_timestamp,
177 ));
178 }
179
180 let remote_header = self
182 .provider
183 .header_by_number(
184 message.inner.identifier.chainId.saturating_to(),
185 message.inner.identifier.blockNumber.saturating_to(),
186 )
187 .await?;
188 let remote_receipts = self
189 .provider
190 .receipts_by_number(
191 message.inner.identifier.chainId.saturating_to(),
192 message.inner.identifier.blockNumber.saturating_to(),
193 )
194 .await?;
195
196 let remote_log = remote_receipts
200 .iter()
201 .flat_map(|receipt| receipt.logs())
202 .nth(message.inner.identifier.logIndex.saturating_to())
203 .ok_or(MessageGraphError::RemoteMessageNotFound(
204 message.inner.identifier.chainId.to(),
205 message.inner.payloadHash,
206 ))?;
207
208 if remote_log.address != message.inner.identifier.origin {
210 return Err(MessageGraphError::InvalidMessageOrigin(
211 message.inner.identifier.origin,
212 remote_log.address,
213 ));
214 }
215
216 let remote_message = RawMessagePayload::from(remote_log);
218 let remote_message_hash = keccak256(remote_message.as_ref());
219 if remote_message_hash != message.inner.payloadHash {
220 return Err(MessageGraphError::InvalidMessageHash(
221 message.inner.payloadHash,
222 remote_message_hash,
223 ));
224 }
225
226 if remote_header.timestamp != initiating_timestamp {
228 return Err(MessageGraphError::InvalidMessageTimestamp(
229 initiating_timestamp,
230 remote_header.timestamp,
231 ));
232 }
233
234 Ok(())
235 }
236}
237
238#[cfg(test)]
239mod test {
240 use super::MessageGraph;
241 use crate::{MessageGraphError, test_util::SuperchainBuilder};
242 use alloy_primitives::{Address, hex, keccak256, map::HashMap};
243 use kona_genesis::{HardForkConfig, RollupConfig};
244
245 const MESSAGE: [u8; 4] = hex!("deadbeef");
246 const OP_CHAIN_ID: u64 = 10;
247 const BASE_CHAIN_ID: u64 = 8453;
248
249 #[tokio::test]
250 async fn test_derive_and_reduce_simple_graph() {
251 let mut superchain = SuperchainBuilder::new(0);
252
253 superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
254 superchain.chain(BASE_CHAIN_ID).add_executing_message(
255 keccak256(MESSAGE),
256 0,
257 OP_CHAIN_ID,
258 0,
259 );
260
261 let (headers, provider) = superchain.build();
262
263 let cfgs = HashMap::default();
264 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
265 graph.resolve().await.unwrap();
266 }
267
268 #[tokio::test]
269 async fn test_derive_and_reduce_cyclical_graph() {
270 let mut superchain = SuperchainBuilder::new(0);
271
272 superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into()).add_executing_message(
273 keccak256(MESSAGE),
274 1,
275 BASE_CHAIN_ID,
276 0,
277 );
278 superchain
279 .chain(BASE_CHAIN_ID)
280 .add_executing_message(keccak256(MESSAGE), 0, OP_CHAIN_ID, 0)
281 .add_initiating_message(MESSAGE.into());
282
283 let (headers, provider) = superchain.build();
284
285 let cfgs = HashMap::default();
286 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
287 graph.resolve().await.unwrap();
288 }
289
290 #[tokio::test]
291 async fn test_derive_and_reduce_simple_graph_remote_message_not_found() {
292 let mut superchain = SuperchainBuilder::new(0);
293
294 superchain.chain(OP_CHAIN_ID);
295 superchain.chain(BASE_CHAIN_ID).add_executing_message(
296 keccak256(MESSAGE),
297 0,
298 OP_CHAIN_ID,
299 0,
300 );
301
302 let (headers, provider) = superchain.build();
303
304 let cfgs = HashMap::default();
305 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
306 assert_eq!(
307 graph.resolve().await.unwrap_err(),
308 MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
309 );
310 }
311
312 #[tokio::test]
313 async fn test_derive_and_reduce_simple_graph_invalid_chain_id() {
314 let mut superchain = SuperchainBuilder::new(0);
315
316 superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
317 superchain.chain(BASE_CHAIN_ID).add_executing_message(
318 keccak256(MESSAGE),
319 0,
320 BASE_CHAIN_ID,
321 0,
322 );
323
324 let (headers, provider) = superchain.build();
325
326 let cfgs = HashMap::default();
327 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
328 assert_eq!(
329 graph.resolve().await.unwrap_err(),
330 MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
331 );
332 }
333
334 #[tokio::test]
335 async fn test_derive_and_reduce_simple_graph_message_before_interop_activation() {
336 let mut superchain = SuperchainBuilder::new(0);
337
338 superchain.chain(0xDEAD).add_initiating_message(MESSAGE.into());
339 superchain.chain(BASE_CHAIN_ID).add_executing_message(keccak256(MESSAGE), 0, 0xDEAD, 0);
340
341 let (headers, provider) = superchain.build();
342
343 let mut cfgs = HashMap::default();
344 cfgs.insert(
345 0xDEAD,
346 RollupConfig {
347 hardforks: HardForkConfig { interop_time: Some(50), ..Default::default() },
348 ..Default::default()
349 },
350 );
351 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
352 assert_eq!(
353 graph.resolve().await.unwrap_err(),
354 MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
355 );
356 }
357
358 #[tokio::test]
359 async fn test_derive_and_reduce_simple_graph_invalid_log_index() {
360 let mut superchain = SuperchainBuilder::new(0);
361
362 superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
363 superchain.chain(BASE_CHAIN_ID).add_executing_message(
364 keccak256(MESSAGE),
365 1,
366 OP_CHAIN_ID,
367 0,
368 );
369
370 let (headers, provider) = superchain.build();
371
372 let cfgs = HashMap::default();
373 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
374 assert_eq!(
375 graph.resolve().await.unwrap_err(),
376 MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
377 );
378 }
379
380 #[tokio::test]
381 async fn test_derive_and_reduce_simple_graph_invalid_message_hash() {
382 let mut superchain = SuperchainBuilder::new(0);
383
384 superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
385 superchain.chain(BASE_CHAIN_ID).add_executing_message(
386 keccak256(hex!("0badc0de")),
387 0,
388 OP_CHAIN_ID,
389 0,
390 );
391
392 let (headers, provider) = superchain.build();
393
394 let cfgs = HashMap::default();
395 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
396 assert_eq!(
397 graph.resolve().await.unwrap_err(),
398 MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
399 );
400 }
401
402 #[tokio::test]
403 async fn test_derive_and_reduce_simple_graph_invalid_origin_address() {
404 let mut superchain = SuperchainBuilder::new(0);
405
406 superchain.chain(OP_CHAIN_ID).add_initiating_message(MESSAGE.into());
407 superchain.chain(BASE_CHAIN_ID).add_executing_message_with_origin(
408 keccak256(MESSAGE),
409 Address::left_padding_from(&[0x01]),
410 0,
411 OP_CHAIN_ID,
412 0,
413 );
414
415 let (headers, provider) = superchain.build();
416
417 let cfgs = HashMap::default();
418 let graph = MessageGraph::derive(headers.as_slice(), &provider, &cfgs).await.unwrap();
419 assert_eq!(
420 graph.resolve().await.unwrap_err(),
421 MessageGraphError::InvalidMessages(vec![BASE_CHAIN_ID])
422 );
423 }
424}