1mod builder;
19pub use builder::*;
20use std::{
21 collections::HashMap,
22 sync::{
23 atomic::{AtomicBool, AtomicI32, AtomicI64, Ordering},
24 Arc,
25 },
26 time::Duration,
27};
28
29use crate::{
30 events::{IbcPacket, IbcPacketKind},
31 ibc_types::{
32 IbcChannelId, IbcChannelOrdering, IbcChannelVersion, IbcClientId, IbcConnectionId,
33 IbcPortId,
34 },
35 prelude::*,
36 querier::stream::BlockEvents,
37};
38use futures::StreamExt;
39
40use serde::{Deserialize, Serialize};
41
42use super::{
43 IbcChannelHandshakeGasSimulationMultipliers, IbcConnectionHandshakeGasSimulationMultipliers,
44};
45
46pub struct IbcRelayer {
47 simulation_gas_multipliers: IbcRelayerGasSimulationMultipliers,
48 inner_log_ok: Arc<dyn Fn(String) + Send + Sync + 'static>,
49 inner_log_err: Arc<dyn Fn(String) + Send + Sync + 'static>,
50 client_infos: Vec<Arc<ClientInfo>>,
51}
52
53impl IbcRelayer {
54 pub async fn start(&self) -> Result<()> {
55 let (task_sender, task_receiver) = futures::channel::mpsc::unbounded();
57
58 let resp = futures::future::join(
59 self.produce_tasks(task_sender),
60 self.consume_tasks(task_receiver),
61 )
62 .await;
63
64 match resp {
65 (Ok(()), _) => Ok(()),
66 (Err(e), _) => Err(e),
67 }
68 }
69
70 async fn produce_tasks(
71 &self,
72 task_sender: futures::channel::mpsc::UnboundedSender<Task>,
73 ) -> Result<()> {
74 let mut unique_clients = HashMap::new();
76
77 for client_info in self.client_infos.iter() {
78 let chain_id_1 = client_info.signing_client_1.chain_id();
79 let chain_id_2 = client_info.signing_client_2.chain_id();
80
81 if !unique_clients.contains_key(chain_id_1) {
82 unique_clients.insert(
83 chain_id_1.clone(),
84 client_info.signing_client_1.querier.clone(),
85 );
86 }
87
88 if !unique_clients.contains_key(chain_id_2) {
89 unique_clients.insert(
90 chain_id_2.clone(),
91 client_info.signing_client_2.querier.clone(),
92 );
93 }
94 }
95
96 let mut streams = Vec::new();
97
98 for (chain_id, client) in unique_clients.iter() {
99 let stream = Box::pin(
100 client
101 .clone()
102 .stream_block_events(None)
103 .await?
104 .map(move |events| (chain_id, events)),
105 );
106
107 streams.push(stream);
108 }
109
110 let mut combined_stream = futures::stream::select_all(streams);
112
113 while let Some((chain_id, events)) = combined_stream.next().await {
114 match events {
115 Ok(events) => {
116 match self
118 .produce_event_tasks(chain_id, events, &task_sender)
119 .await
120 {
121 Ok(()) => {}
122 Err(e) => {
123 self.log_err(format!(
124 "Error processing events for chain {}: {:?}",
125 chain_id, e
126 ));
127 }
128 }
129 }
130 Err(e) => {
131 self.log_err(format!("Error querying chain {}: {:?}", chain_id, e));
132 }
133 }
134 }
135
136 Ok(())
137 }
138
139 async fn consume_tasks(
140 &self,
141 mut task_receiver: futures::channel::mpsc::UnboundedReceiver<Task>,
142 ) {
143 while let Some(task) = task_receiver.next().await {
144 match self.consume_task(task).await {
146 Ok(()) => {}
147 Err(e) => {
148 self.log_err(format!("Error handling task: {:?}", e));
149 }
150 }
151 }
152 }
153
154 async fn produce_event_tasks(
158 &self,
159 chain_id: &ChainId,
160 block_events: BlockEvents,
161 task_sender: &futures::channel::mpsc::UnboundedSender<Task>,
162 ) -> Result<()> {
163 macro_rules! write_out {
164 ($($arg:tt)*) => {
165 self.log_ok(format!($($arg)*));
166 };
167 }
168
169 let BlockEvents { height, events } = block_events;
170
171 #[allow(clippy::collapsible_if)]
172 for client_info in self.client_infos.iter() {
173 if client_info.signing_client_1.chain_id() == chain_id {
174 if !client_info.update_1.get_is_auto_updating()
175 && client_info.is_past_update_height(Side::One, height).await?
176 {
177 client_info.update_1.set_is_auto_updating(true);
178 task_sender.unbounded_send(Task::AutoUpdateClient {
179 client_info: client_info.clone(),
180 side: Side::One,
181 })?;
182 }
183 } else if client_info.signing_client_2.chain_id() == chain_id {
184 if !client_info.update_2.get_is_auto_updating()
185 && client_info.is_past_update_height(Side::Two, height).await?
186 {
187 client_info.update_2.set_is_auto_updating(true);
188 task_sender.unbounded_send(Task::AutoUpdateClient {
189 client_info: client_info.clone(),
190 side: Side::Two,
191 })?;
192 }
193 }
194 }
195
196 let events = CosmosTxEvents::from(events.as_slice());
197
198 for event in events.events_iter() {
199 match IbcPacket::try_from(&event) {
200 Ok(packet) => {
201 write_out!("[IBC EVENT] {:?}", packet.kind);
202 task_sender.unbounded_send(Task::RelayPacket {
203 client_packet: self
204 .get_client_packet(chain_id, packet)?
205 .context("couldn't find client info for packet")?,
206 })?;
207 }
208 Err(_) => {
209 }
211 }
212 }
213 Ok(())
214 }
215
216 async fn consume_task(&self, task: Task) -> Result<()> {
218 macro_rules! write_out {
219 ($($arg:tt)*) => {
220 self.log_ok(format!($($arg)*));
221 };
222 }
223 match task {
224 Task::AutoUpdateClient { client_info, side } => {
225 self.update_ibc_client(&client_info, side).await?;
226 }
227 Task::RelayPacket { client_packet } => {
228 let ClientPacket {
229 client_info,
230 side,
231 packet,
232 } = client_packet;
233
234 match packet.kind {
235 IbcPacketKind::Send | IbcPacketKind::WriteAck => {
236 let (dst_signing_client, src_querier, dst_ibc_client_id, tx_builder) =
238 match side {
239 Side::One => (
240 client_info.signing_client_2.clone(),
241 client_info.signing_client_1.querier.clone(),
242 client_info.ibc_client_id_2.clone(),
243 client_info
244 .tx_builder(Side::Two, &self.simulation_gas_multipliers),
245 ),
246 Side::Two => (
247 client_info.signing_client_1.clone(),
248 client_info.signing_client_2.querier.clone(),
249 client_info.ibc_client_id_1.clone(),
250 client_info
251 .tx_builder(Side::One, &self.simulation_gas_multipliers),
252 ),
253 };
254
255 match side {
259 Side::One => {
260 self.update_ibc_client(&client_info, Side::One).await?;
261 self.update_ibc_client(&client_info, Side::Two).await?;
262 }
263 Side::Two => {
264 self.update_ibc_client(&client_info, Side::Two).await?;
265 self.update_ibc_client(&client_info, Side::One).await?;
266 }
267 }
268
269 if packet.kind == IbcPacketKind::Send {
270 write_out!(
271 "[RELAYING PACKET SEND] {}:{} -> {}:{}",
272 src_querier.chain_config.chain_id,
273 packet.src_port_id,
274 dst_signing_client.chain_id(),
275 packet.dst_port_id
276 );
277 dst_signing_client
278 .ibc_packet_recv(
279 &dst_ibc_client_id,
280 packet,
281 &src_querier,
282 Some(tx_builder),
283 )
284 .await?;
285 } else if packet.kind == IbcPacketKind::WriteAck {
286 write_out!(
287 "[RELAYING PACKET ACK] {}:{} -> {}:{}",
288 src_querier.chain_config.chain_id,
289 packet.src_port_id,
290 dst_signing_client.chain_id(),
291 packet.dst_port_id
292 );
293 dst_signing_client
294 .ibc_packet_ack(
295 &dst_ibc_client_id,
296 packet.clone(),
297 &src_querier,
298 Some(tx_builder),
299 )
300 .await?;
301 }
302 }
303 IbcPacketKind::Ack => {
304 write_out!(
305 "[PACKET ACK] CONFIRMED {} <-> {}",
306 packet.src_port_id,
307 packet.dst_port_id
308 );
309 }
310 IbcPacketKind::Timeout => {
311 write_out!(
313 "[PACKET TIMEOUT] {} <-> {}",
314 packet.src_port_id,
315 packet.dst_port_id
316 );
317 }
318 IbcPacketKind::Receive => {
319 write_out!(
321 "[PACKET RECEIVE] {} <-> {}",
322 packet.src_port_id,
323 packet.dst_port_id
324 );
325 }
326 }
327 }
328 }
329 Ok(())
330 }
331
332 async fn update_ibc_client(&self, client_info: &ClientInfo, side: Side) -> Result<()> {
333 let log_ok = self.inner_log_ok.clone();
334 client_info
335 .update(side, &self.simulation_gas_multipliers, move |s| log_ok(s))
336 .await
337 }
338
339 fn get_client_packet(
343 &self,
344 chain_id: &ChainId,
345 mut packet: IbcPacket,
346 ) -> Result<Option<ClientPacket>> {
347 for client_info in self.client_infos.iter() {
348 let side = if chain_id == client_info.signing_client_1.chain_id() {
349 Some(Side::One)
350 } else if chain_id == client_info.signing_client_2.chain_id() {
351 Some(Side::Two)
352 } else {
353 None
354 };
355
356 if side.is_none()
358 || (packet.src_connection_id != client_info.connection_id_1
359 && packet.src_connection_id != client_info.connection_id_2)
360 {
361 continue;
362 }
363
364 let side = side.unwrap();
365
366 for channel in client_info.channels.iter() {
367 if packet.src_channel_id == channel.channel_id_1
368 && packet.dst_channel_id == channel.channel_id_2
369 && packet.src_port_id == channel.port_id_1
370 && packet.dst_port_id == channel.port_id_2
371 {
372 match side {
374 Side::One => {
375 packet.src_connection_id = client_info.connection_id_1.clone();
376 packet.dst_connection_id = client_info.connection_id_2.clone();
377 }
379 Side::Two => {
380 packet.src_connection_id = client_info.connection_id_2.clone();
381 packet.dst_connection_id = client_info.connection_id_1.clone();
382 std::mem::swap(&mut packet.src_port_id, &mut packet.dst_port_id);
383 std::mem::swap(&mut packet.src_channel_id, &mut packet.dst_channel_id);
384 }
385 }
386 return Ok(Some(ClientPacket {
387 client_info: client_info.clone(),
388 side,
389 packet,
390 }));
391 } else if packet.src_channel_id == channel.channel_id_2
392 && packet.dst_channel_id == channel.channel_id_1
393 && packet.src_port_id == channel.port_id_2
394 && packet.dst_port_id == channel.port_id_1
395 {
396 match side {
398 Side::One => {
399 packet.src_connection_id = client_info.connection_id_1.clone();
400 packet.dst_connection_id = client_info.connection_id_2.clone();
401 std::mem::swap(&mut packet.src_port_id, &mut packet.dst_port_id);
402 std::mem::swap(&mut packet.src_channel_id, &mut packet.dst_channel_id);
403 }
404 Side::Two => {
405 packet.src_connection_id = client_info.connection_id_2.clone();
406 packet.dst_connection_id = client_info.connection_id_1.clone();
407 }
409 }
410 return Ok(Some(ClientPacket {
411 client_info: client_info.clone(),
412 side,
413 packet,
414 }));
415 }
416 }
417 }
418 Ok(None)
419 }
420
421 fn log_ok(&self, s: String) {
422 (self.inner_log_ok)(s);
423 }
424
425 fn log_err(&self, s: String) {
426 (self.inner_log_err)(s);
427 }
428}
429
430enum Task {
431 AutoUpdateClient {
432 client_info: Arc<ClientInfo>,
433 side: Side,
434 },
435 RelayPacket {
436 client_packet: ClientPacket,
437 },
438}
439
440#[derive(Debug, Clone, Copy, Eq, PartialEq)]
441enum Side {
442 One,
443 Two,
444}
445struct ClientInfo {
447 pub signing_client_1: SigningClient,
448 pub signing_client_2: SigningClient,
449 pub ibc_client_id_1: IbcClientId,
450 pub ibc_client_id_2: IbcClientId,
451 pub trusting_period_1: Duration,
452 pub trusting_period_2: Duration,
453 pub update_1: ClientUpdate,
454 pub update_2: ClientUpdate,
455 pub connection_id_1: IbcConnectionId,
456 pub connection_id_2: IbcConnectionId,
457 pub channels: Vec<ClientInfoChannel>,
458}
459
460#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
461pub struct ClientInfoChannel {
462 pub channel_id_1: IbcChannelId,
463 pub channel_id_2: IbcChannelId,
464 pub port_id_1: IbcPortId,
465 pub port_id_2: IbcPortId,
466 pub channel_version: IbcChannelVersion,
467 pub channel_ordering: IbcChannelOrdering,
468}
469
470impl ClientInfo {
471 fn signing_client(&self, side: Side) -> &SigningClient {
472 match side {
473 Side::One => &self.signing_client_1,
474 Side::Two => &self.signing_client_2,
475 }
476 }
477
478 fn tx_builder(
479 &self,
480 side: Side,
481 simulation_gas_multipliers: &IbcRelayerGasSimulationMultipliers,
482 ) -> TxBuilder {
483 let mut tx_builder = self.signing_client(side).tx_builder();
484 match side {
485 Side::One => {
486 if let Some(gas_simulation_multiplier) = simulation_gas_multipliers.update_client_1
487 {
488 tx_builder.set_gas_simulate_multiplier(gas_simulation_multiplier);
489 }
490 }
491 Side::Two => {
492 if let Some(gas_simulation_multiplier) = simulation_gas_multipliers.update_client_2
493 {
494 tx_builder.set_gas_simulate_multiplier(gas_simulation_multiplier);
495 }
496 }
497 }
498
499 tx_builder
500 }
501
502 fn counterparty_querier(&self, side: Side) -> &QueryClient {
503 match side {
504 Side::One => &self.signing_client_2.querier,
505 Side::Two => &self.signing_client_1.querier,
506 }
507 }
508
509 fn ibc_client_id(&self, side: Side) -> &IbcClientId {
510 match side {
511 Side::One => &self.ibc_client_id_1,
512 Side::Two => &self.ibc_client_id_2,
513 }
514 }
515
516 fn stale_duration(&self, side: Side) -> Duration {
518 match side {
519 Side::One => self
520 .trusting_period_1
521 .checked_div(3)
522 .unwrap_or_else(|| Duration::from_secs(0)),
523 Side::Two => self
524 .trusting_period_2
525 .checked_div(3)
526 .unwrap_or_else(|| Duration::from_secs(0)),
527 }
528 }
529
530 async fn set_next_update_time(&self, side: Side) -> Result<()> {
531 let stale_duration = self.stale_duration(side);
532
533 let mut update_time = self
534 .signing_client(side)
535 .querier
536 .block_header(None)
537 .await?
538 .time()
539 .context("No block time found")?;
540 update_time.seconds += i64::try_from(stale_duration.as_secs())?;
541 update_time.nanos += i32::try_from(stale_duration.subsec_nanos())?;
542
543 match side {
544 Side::One => {
545 self.update_1.set_next_time(update_time);
546 self.update_1.set_is_auto_updating(false);
547 }
548 Side::Two => {
549 self.update_2.set_next_time(update_time);
550 self.update_2.set_is_auto_updating(false);
551 }
552 }
553
554 Ok(())
555 }
556
557 async fn is_past_update_height(&self, side: Side, height: u64) -> Result<bool> {
558 let current_time = self
559 .signing_client(side)
560 .querier
561 .block_header(Some(height))
562 .await?
563 .time()
564 .context("No block time found")?;
565 let next_update_time = match side {
566 Side::One => self.update_1.get_next_time(),
567 Side::Two => self.update_2.get_next_time(),
568 };
569 Ok(current_time.seconds > next_update_time.seconds
570 || (current_time.seconds == next_update_time.seconds
571 && current_time.nanos > next_update_time.nanos))
572 }
573
574 async fn update(
575 &self,
576 side: Side,
577 simulation_gas_multipliers: &IbcRelayerGasSimulationMultipliers,
578 log_ok: impl Fn(String) + Send + Sync + 'static,
579 ) -> Result<()> {
580 let client = self.signing_client(side);
581 let ibc_client_id = self.ibc_client_id(side);
582 let counterparty_client_querier = self.counterparty_querier(side);
583
584 let height_before = *client
585 .querier
586 .ibc_client_state(ibc_client_id, None)
587 .await?
588 .latest_height
589 .as_ref()
590 .context("missing latest height")?;
591
592 log_ok(format!(
593 "[CLIENT UPDATE] starting {}:{} -> {} height: {}",
594 client.chain_id(),
595 ibc_client_id,
596 counterparty_client_querier.chain_config.chain_id,
597 height_before.revision_height,
598 ));
599 let tx_builder = self.tx_builder(side, simulation_gas_multipliers);
600 client
601 .ibc_update_client(
602 ibc_client_id,
603 counterparty_client_querier,
604 Some(height_before),
605 Some(tx_builder),
606 )
607 .await?;
608
609 let height_after = *client
610 .querier
611 .ibc_client_state(ibc_client_id, None)
612 .await?
613 .latest_height
614 .as_ref()
615 .context("missing latest height")?;
616
617 log_ok(format!(
618 "[CLIENT UPDATED] {}:{} -> {} height: {}",
619 client.chain_id(),
620 ibc_client_id,
621 counterparty_client_querier.chain_config.chain_id,
622 height_after.revision_height,
623 ));
624
625 self.set_next_update_time(side).await?;
626
627 Ok(())
628 }
629}
630
631#[derive(Default)]
632struct ClientUpdate {
633 pub next_time_seconds: AtomicI64,
634 pub next_time_subnanos: AtomicI32,
635 pub is_auto_updating: AtomicBool,
636}
637
638impl ClientUpdate {
639 pub fn get_next_time(&self) -> layer_climb_proto::Timestamp {
640 layer_climb_proto::Timestamp {
641 seconds: self.next_time_seconds.load(Ordering::SeqCst),
642 nanos: self.next_time_subnanos.load(Ordering::SeqCst),
643 }
644 }
645
646 pub fn set_next_time(&self, time: layer_climb_proto::Timestamp) {
647 self.next_time_seconds.store(time.seconds, Ordering::SeqCst);
648 self.next_time_subnanos.store(time.nanos, Ordering::SeqCst);
649 }
650
651 pub fn get_is_auto_updating(&self) -> bool {
652 self.is_auto_updating.load(Ordering::SeqCst)
653 }
654
655 pub fn set_is_auto_updating(&self, is_updating: bool) {
656 self.is_auto_updating.store(is_updating, Ordering::SeqCst);
657 }
658}
659
660struct ClientPacket {
661 client_info: Arc<ClientInfo>,
662 side: Side,
663 packet: IbcPacket,
664}
665
666#[derive(Serialize, Deserialize, Debug, Clone)]
667pub struct IbcRelayerGasSimulationMultipliers {
668 pub connection_handshake: Option<IbcConnectionHandshakeGasSimulationMultipliers>,
670 pub channel_handshake: Option<IbcChannelHandshakeGasSimulationMultipliers>,
672 pub update_client_1: Option<f32>,
673 pub update_client_2: Option<f32>,
674 pub send_packet_1: Option<f32>,
675 pub send_packet_2: Option<f32>,
676}
677
678impl Default for IbcRelayerGasSimulationMultipliers {
679 fn default() -> Self {
680 Self {
681 connection_handshake: Some(IbcConnectionHandshakeGasSimulationMultipliers::default()),
682 channel_handshake: Some(IbcChannelHandshakeGasSimulationMultipliers::default()),
683 update_client_1: Some(2.5),
684 update_client_2: Some(2.5),
685 send_packet_1: Some(2.5),
686 send_packet_2: Some(2.5),
687 }
688 }
689}