1use crate::{
4 MESSAGE_EXPIRY_WINDOW, RawMessagePayload,
5 errors::{MessageGraphError, MessageGraphResult},
6 message::{EnrichedExecutingMessage, extract_executing_messages},
7 traits::InteropProvider,
8};
9use alloc::{string::ToString, vec::Vec};
10use alloy_consensus::{Header, Sealed};
11use alloy_primitives::keccak256;
12use kona_genesis::RollupConfig;
13use kona_registry::{HashMap, ROLLUP_CONFIGS};
14use tracing::{info, warn};
15
16#[derive(Debug)]
29pub struct MessageGraph<'a, P> {
30 messages: Vec<EnrichedExecutingMessage>,
34 provider: &'a P,
37 rollup_configs: &'a HashMap<u64, RollupConfig>,
39}
40
41impl<'a, P> MessageGraph<'a, P>
42where
43 P: InteropProvider,
44{
45 pub async fn derive(
50 blocks: &HashMap<u64, Sealed<Header>>,
51 provider: &'a P,
52 rollup_configs: &'a HashMap<u64, RollupConfig>,
53 ) -> MessageGraphResult<Self, P> {
54 info!(
55 target: "message_graph",
56 num_chains = blocks.len(),
57 "Deriving message graph",
58 );
59
60 let mut messages = Vec::with_capacity(blocks.len());
61 for (chain_id, header) in blocks.iter() {
62 let receipts = provider.receipts_by_hash(*chain_id, header.hash()).await?;
63 let executing_messages = extract_executing_messages(receipts.as_slice());
64
65 messages.extend(executing_messages.into_iter().map(|message| {
66 EnrichedExecutingMessage::new(message, *chain_id, header.timestamp)
67 }));
68 }
69
70 info!(
71 target: "message_graph",
72 num_chains = blocks.len(),
73 num_messages = messages.len(),
74 "Derived message graph successfully",
75 );
76 Ok(Self { messages, provider, rollup_configs })
77 }
78
79 pub async fn resolve(self) -> MessageGraphResult<(), P> {
91 info!(
92 target: "message_graph",
93 "Checking the message graph for invalid messages"
94 );
95
96 let mut invalid_messages = HashMap::default();
98
99 for message in self.messages.iter() {
103 if let Err(e) = self.check_single_dependency(message).await {
104 warn!(
105 target: "message_graph",
106 executing_chain_id = message.executing_chain_id,
107 message_hash = ?message.inner.payloadHash,
108 err = %e,
109 "Invalid ExecutingMessage found",
110 );
111 invalid_messages.insert(message.executing_chain_id, e);
112 }
113 }
114
115 info!(
116 target: "message_graph",
117 num_invalid_messages = invalid_messages.len(),
118 "Successfully reduced the message graph",
119 );
120
121 if !invalid_messages.is_empty() {
123 warn!(
124 target: "message_graph",
125 bad_chain_ids = %invalid_messages
126 .keys()
127 .map(ToString::to_string)
128 .collect::<Vec<_>>()
129 .join(", "),
130 "Failed to reduce the message graph entirely",
131 );
132
133 return Err(MessageGraphError::InvalidMessages(invalid_messages));
135 }
136
137 Ok(())
138 }
139
140 async fn check_single_dependency(
144 &self,
145 message: &EnrichedExecutingMessage,
146 ) -> MessageGraphResult<(), P> {
147 let initiating_chain_id = message.inner.identifier.chainId.saturating_to();
151 let initiating_timestamp = message.inner.identifier.timestamp.saturating_to::<u64>();
152
153 let rollup_config = ROLLUP_CONFIGS
156 .get(&initiating_chain_id)
157 .or_else(|| self.rollup_configs.get(&initiating_chain_id))
158 .ok_or(MessageGraphError::MissingRollupConfig(initiating_chain_id))?;
159
160 if initiating_timestamp > message.executing_timestamp {
164 return Err(MessageGraphError::MessageInFuture {
165 max: message.executing_timestamp,
166 actual: initiating_timestamp,
167 });
168 } else if initiating_timestamp <
169 rollup_config.hardforks.interop_time.unwrap_or_default() + rollup_config.block_time
170 {
171 return Err(MessageGraphError::InitiatedTooEarly {
172 activation_time: rollup_config.hardforks.interop_time.unwrap_or_default(),
173 initiating_message_time: initiating_timestamp,
174 });
175 }
176
177 if initiating_timestamp < message.executing_timestamp.saturating_sub(MESSAGE_EXPIRY_WINDOW)
181 {
182 return Err(MessageGraphError::MessageExpired {
183 initiating_timestamp,
184 executing_timestamp: message.executing_timestamp,
185 });
186 }
187
188 let remote_header = self
190 .provider
191 .header_by_number(
192 message.inner.identifier.chainId.saturating_to(),
193 message.inner.identifier.blockNumber.saturating_to(),
194 )
195 .await?;
196 let remote_receipts = self
197 .provider
198 .receipts_by_number(
199 message.inner.identifier.chainId.saturating_to(),
200 message.inner.identifier.blockNumber.saturating_to(),
201 )
202 .await?;
203
204 let remote_log = remote_receipts
208 .iter()
209 .flat_map(|receipt| receipt.logs())
210 .nth(message.inner.identifier.logIndex.saturating_to())
211 .ok_or(MessageGraphError::RemoteMessageNotFound {
212 chain_id: message.inner.identifier.chainId.to(),
213 message_hash: message.inner.payloadHash,
214 })?;
215
216 if remote_log.address != message.inner.identifier.origin {
218 return Err(MessageGraphError::InvalidMessageOrigin {
219 expected: message.inner.identifier.origin,
220 actual: remote_log.address,
221 });
222 }
223
224 let remote_message = RawMessagePayload::from(remote_log);
226 let remote_message_hash = keccak256(remote_message.as_ref());
227 if remote_message_hash != message.inner.payloadHash {
228 return Err(MessageGraphError::InvalidMessageHash {
229 expected: message.inner.payloadHash,
230 actual: remote_message_hash,
231 });
232 }
233
234 if remote_header.timestamp != initiating_timestamp {
236 return Err(MessageGraphError::InvalidMessageTimestamp {
237 expected: initiating_timestamp,
238 actual: remote_header.timestamp,
239 });
240 }
241
242 Ok(())
243 }
244}
245
246#[cfg(test)]
247mod test {
248 use super::{MESSAGE_EXPIRY_WINDOW, MessageGraph};
249 use crate::{
250 MessageGraphError,
251 test_util::{ExecutingMessageBuilder, SuperchainBuilder},
252 };
253 use alloy_primitives::{Address, hex, keccak256};
254
255 const MOCK_MESSAGE: [u8; 4] = hex!("deadbeef");
256 const CHAIN_A_ID: u64 = 1;
257 const CHAIN_B_ID: u64 = 2;
258
259 fn default_superchain() -> SuperchainBuilder {
263 let mut superchain = SuperchainBuilder::new();
264 superchain
265 .chain(CHAIN_A_ID)
266 .with_timestamp(2)
267 .with_block_time(2)
268 .with_interop_activation_time(0);
269 superchain
270 .chain(CHAIN_B_ID)
271 .with_timestamp(2)
272 .with_block_time(2)
273 .with_interop_activation_time(0);
274
275 superchain
276 }
277
278 #[tokio::test]
279 async fn test_derive_and_resolve_simple_graph_no_cycles() {
280 let mut superchain = default_superchain();
281
282 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
283
284 superchain.chain(CHAIN_A_ID).add_initiating_message(MOCK_MESSAGE.into());
285 superchain.chain(CHAIN_B_ID).add_executing_message(
286 ExecutingMessageBuilder::default()
287 .with_message_hash(keccak256(MOCK_MESSAGE))
288 .with_origin_chain_id(CHAIN_A_ID)
289 .with_origin_timestamp(chain_a_time),
290 );
291
292 let (headers, cfgs, provider) = superchain.build();
293
294 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
295 graph.resolve().await.unwrap();
296 }
297
298 #[tokio::test]
299 async fn test_derive_and_resolve_simple_graph_with_cycles() {
300 let mut superchain = default_superchain();
301
302 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
303 let chain_b_time = superchain.chain(CHAIN_B_ID).header.timestamp;
304
305 superchain
306 .chain(CHAIN_A_ID)
307 .add_initiating_message(MOCK_MESSAGE.into())
308 .add_executing_message(
309 ExecutingMessageBuilder::default()
310 .with_message_hash(keccak256(MOCK_MESSAGE))
311 .with_origin_chain_id(CHAIN_B_ID)
312 .with_origin_timestamp(chain_b_time),
313 );
314 superchain
315 .chain(CHAIN_B_ID)
316 .add_initiating_message(MOCK_MESSAGE.into())
317 .add_executing_message(
318 ExecutingMessageBuilder::default()
319 .with_message_hash(keccak256(MOCK_MESSAGE))
320 .with_origin_chain_id(CHAIN_A_ID)
321 .with_origin_timestamp(chain_a_time),
322 );
323
324 let (headers, cfgs, provider) = superchain.build();
325
326 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
327 graph.resolve().await.unwrap();
328 }
329
330 #[tokio::test]
331 async fn test_derive_and_resolve_graph_message_in_future() {
332 let mut superchain = default_superchain();
333
334 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
335
336 superchain.chain(CHAIN_A_ID).add_initiating_message(MOCK_MESSAGE.into());
337 superchain.chain(CHAIN_B_ID).add_executing_message(
338 ExecutingMessageBuilder::default()
339 .with_message_hash(keccak256(MOCK_MESSAGE))
340 .with_origin_chain_id(CHAIN_A_ID)
341 .with_origin_timestamp(chain_a_time + 1),
342 );
343
344 let (headers, cfgs, provider) = superchain.build();
345
346 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
347 let MessageGraphError::InvalidMessages(invalid_messages) =
348 graph.resolve().await.unwrap_err()
349 else {
350 panic!("Expected invalid messages")
351 };
352
353 assert_eq!(invalid_messages.len(), 1);
354 assert_eq!(
355 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
356 MessageGraphError::MessageInFuture { max: 2, actual: chain_a_time + 1 }
357 );
358 }
359
360 #[tokio::test]
361 async fn test_derive_and_resolve_graph_initiating_before_interop() {
362 let mut superchain = default_superchain();
363
364 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
365
366 superchain
367 .chain(CHAIN_A_ID)
368 .with_interop_activation_time(50)
369 .add_initiating_message(MOCK_MESSAGE.into());
370 superchain.chain(CHAIN_B_ID).add_executing_message(
371 ExecutingMessageBuilder::default()
372 .with_message_hash(keccak256(MOCK_MESSAGE))
373 .with_origin_chain_id(CHAIN_A_ID)
374 .with_origin_timestamp(chain_a_time),
375 );
376
377 let (headers, cfgs, provider) = superchain.build();
378
379 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
380 let MessageGraphError::InvalidMessages(invalid_messages) =
381 graph.resolve().await.unwrap_err()
382 else {
383 panic!("Expected invalid messages")
384 };
385
386 assert_eq!(invalid_messages.len(), 1);
387 assert_eq!(
388 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
389 MessageGraphError::InitiatedTooEarly {
390 activation_time: 50,
391 initiating_message_time: chain_a_time
392 }
393 );
394 }
395
396 #[tokio::test]
397 async fn test_derive_and_resolve_graph_initiating_before_interop_unaligned_activation() {
398 let mut superchain = default_superchain();
399
400 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
401
402 superchain
405 .chain(CHAIN_A_ID)
406 .with_interop_activation_time(1)
407 .add_initiating_message(MOCK_MESSAGE.into());
408 superchain.chain(CHAIN_B_ID).add_executing_message(
409 ExecutingMessageBuilder::default()
410 .with_message_hash(keccak256(MOCK_MESSAGE))
411 .with_origin_chain_id(CHAIN_A_ID)
412 .with_origin_timestamp(chain_a_time),
413 );
414
415 let (headers, cfgs, provider) = superchain.build();
416
417 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
418 let MessageGraphError::InvalidMessages(invalid_messages) =
419 graph.resolve().await.unwrap_err()
420 else {
421 panic!("Expected invalid messages")
422 };
423
424 assert_eq!(invalid_messages.len(), 1);
425 assert_eq!(
426 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
427 MessageGraphError::InitiatedTooEarly {
428 activation_time: 1,
429 initiating_message_time: chain_a_time
430 }
431 );
432 }
433
434 #[tokio::test]
435 async fn test_derive_and_resolve_graph_initiating_at_interop_activation() {
436 let mut superchain = default_superchain();
437
438 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
439
440 superchain
441 .chain(CHAIN_A_ID)
442 .with_interop_activation_time(chain_a_time)
443 .add_initiating_message(MOCK_MESSAGE.into());
444 superchain.chain(CHAIN_B_ID).add_executing_message(
445 ExecutingMessageBuilder::default()
446 .with_message_hash(keccak256(MOCK_MESSAGE))
447 .with_origin_chain_id(CHAIN_A_ID)
448 .with_origin_timestamp(chain_a_time),
449 );
450
451 let (headers, cfgs, provider) = superchain.build();
452
453 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
454 let MessageGraphError::InvalidMessages(invalid_messages) =
455 graph.resolve().await.unwrap_err()
456 else {
457 panic!("Expected invalid messages")
458 };
459
460 assert_eq!(invalid_messages.len(), 1);
461 assert_eq!(
462 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
463 MessageGraphError::InitiatedTooEarly { activation_time: 2, initiating_message_time: 2 }
464 );
465 }
466
467 #[tokio::test]
468 async fn test_derive_and_resolve_graph_message_expired() {
469 let mut superchain = default_superchain();
470
471 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
472
473 superchain.chain(CHAIN_A_ID).add_initiating_message(MOCK_MESSAGE.into());
474 superchain
475 .chain(CHAIN_B_ID)
476 .with_timestamp(chain_a_time + MESSAGE_EXPIRY_WINDOW + 1)
477 .add_executing_message(
478 ExecutingMessageBuilder::default()
479 .with_message_hash(keccak256(MOCK_MESSAGE))
480 .with_origin_chain_id(CHAIN_A_ID)
481 .with_origin_timestamp(chain_a_time),
482 );
483
484 let (headers, cfgs, provider) = superchain.build();
485
486 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
487 let MessageGraphError::InvalidMessages(invalid_messages) =
488 graph.resolve().await.unwrap_err()
489 else {
490 panic!("Expected invalid messages")
491 };
492
493 assert_eq!(invalid_messages.len(), 1);
494 assert_eq!(
495 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
496 MessageGraphError::MessageExpired {
497 initiating_timestamp: chain_a_time,
498 executing_timestamp: chain_a_time + MESSAGE_EXPIRY_WINDOW + 1
499 }
500 );
501 }
502
503 #[tokio::test]
504 async fn test_derive_and_resolve_graph_remote_message_not_found() {
505 let mut superchain = default_superchain();
506
507 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
508
509 superchain.chain(CHAIN_B_ID).add_executing_message(
510 ExecutingMessageBuilder::default()
511 .with_message_hash(keccak256(MOCK_MESSAGE))
512 .with_origin_chain_id(CHAIN_A_ID)
513 .with_origin_timestamp(chain_a_time),
514 );
515
516 let (headers, cfgs, provider) = superchain.build();
517
518 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
519 let MessageGraphError::InvalidMessages(invalid_messages) =
520 graph.resolve().await.unwrap_err()
521 else {
522 panic!("Expected invalid messages")
523 };
524
525 assert_eq!(invalid_messages.len(), 1);
526 assert_eq!(
527 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
528 MessageGraphError::RemoteMessageNotFound {
529 chain_id: CHAIN_A_ID,
530 message_hash: keccak256(MOCK_MESSAGE)
531 }
532 );
533 }
534
535 #[tokio::test]
536 async fn test_derive_and_resolve_graph_invalid_origin_address() {
537 let mut superchain = default_superchain();
538 let mock_address = Address::left_padding_from(&[0xFF]);
539
540 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
541
542 superchain.chain(CHAIN_A_ID).add_initiating_message(MOCK_MESSAGE.into());
543 superchain.chain(CHAIN_B_ID).add_executing_message(
544 ExecutingMessageBuilder::default()
545 .with_message_hash(keccak256(MOCK_MESSAGE))
546 .with_origin_chain_id(CHAIN_A_ID)
547 .with_origin_address(mock_address)
548 .with_origin_timestamp(chain_a_time),
549 );
550
551 let (headers, cfgs, provider) = superchain.build();
552
553 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
554 let MessageGraphError::InvalidMessages(invalid_messages) =
555 graph.resolve().await.unwrap_err()
556 else {
557 panic!("Expected invalid messages")
558 };
559
560 assert_eq!(invalid_messages.len(), 1);
561 assert_eq!(
562 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
563 MessageGraphError::InvalidMessageOrigin {
564 expected: mock_address,
565 actual: Address::ZERO
566 }
567 );
568 }
569
570 #[tokio::test]
571 async fn test_derive_and_resolve_graph_invalid_message_hash() {
572 let mut superchain = default_superchain();
573 let mock_message_hash = keccak256([0xBE, 0xEF]);
574
575 let chain_a_time = superchain.chain(CHAIN_A_ID).header.timestamp;
576
577 superchain.chain(CHAIN_A_ID).add_initiating_message(MOCK_MESSAGE.into());
578 superchain.chain(CHAIN_B_ID).add_executing_message(
579 ExecutingMessageBuilder::default()
580 .with_message_hash(mock_message_hash)
581 .with_origin_chain_id(CHAIN_A_ID)
582 .with_origin_timestamp(chain_a_time),
583 );
584
585 let (headers, cfgs, provider) = superchain.build();
586
587 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
588 let MessageGraphError::InvalidMessages(invalid_messages) =
589 graph.resolve().await.unwrap_err()
590 else {
591 panic!("Expected invalid messages")
592 };
593
594 assert_eq!(invalid_messages.len(), 1);
595 assert_eq!(
596 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
597 MessageGraphError::InvalidMessageHash {
598 expected: mock_message_hash,
599 actual: keccak256(MOCK_MESSAGE)
600 }
601 );
602 }
603
604 #[tokio::test]
605 async fn test_derive_and_resolve_graph_invalid_timestamp() {
606 let mut superchain = default_superchain();
607
608 let chain_a_time = superchain.chain(CHAIN_A_ID).with_timestamp(4).header.timestamp;
609
610 superchain.chain(CHAIN_A_ID).add_initiating_message(MOCK_MESSAGE.into());
611 superchain.chain(CHAIN_B_ID).with_timestamp(4).add_executing_message(
612 ExecutingMessageBuilder::default()
613 .with_message_hash(keccak256(MOCK_MESSAGE))
614 .with_origin_chain_id(CHAIN_A_ID)
615 .with_origin_timestamp(chain_a_time - 1),
616 );
617
618 let (headers, cfgs, provider) = superchain.build();
619
620 let graph = MessageGraph::derive(&headers, &provider, &cfgs).await.unwrap();
621 let MessageGraphError::InvalidMessages(invalid_messages) =
622 graph.resolve().await.unwrap_err()
623 else {
624 panic!("Expected invalid messages")
625 };
626
627 assert_eq!(invalid_messages.len(), 1);
628 assert_eq!(
629 *invalid_messages.get(&CHAIN_B_ID).unwrap(),
630 MessageGraphError::InvalidMessageTimestamp {
631 expected: chain_a_time - 1,
632 actual: chain_a_time
633 }
634 );
635 }
636}