layer_climb_core/signing/ibc/
relayer.rs

1/*
2High-level overview of the relayer:
3
4RELAYER
5
61. It will listen for IBC packets on all chains
72. There are two separate tasks: one for producing tasks (listening for IBC packets) and one for consuming tasks (updating clients and relaying packets)
83. These are not run in parallel, but rather in a join, so that it works in a single-threaded environment like browsers
94. Also, the task sender does _very little_ work, just formats the tasks, so it doesn't block the event loop anyway
10
11CACHE
12
131. "prepping the cache" creates all the clients, connections, and channels that the relayer will use
142. it will automatically try to update all clients and invalidate its cache as needed
153. basically, that means you can just "prep the cache" with the last prepped-cache and everything will work as expected
16
17*/
18mod builder;
19pub use builder::*;
20use std::{
21    collections::HashMap,
22    sync::{
23        atomic::{AtomicBool, AtomicI32, AtomicI64, Ordering},
24        Arc,
25    },
26    time::Duration,
27};
28
29use crate::{
30    events::{IbcPacket, IbcPacketKind},
31    ibc_types::{
32        IbcChannelId, IbcChannelOrdering, IbcChannelVersion, IbcClientId, IbcConnectionId,
33        IbcPortId,
34    },
35    prelude::*,
36    querier::stream::BlockEvents,
37};
38use futures::StreamExt;
39
40use serde::{Deserialize, Serialize};
41
42use super::{
43    IbcChannelHandshakeGasSimulationMultipliers, IbcConnectionHandshakeGasSimulationMultipliers,
44};
45
46pub struct IbcRelayer {
47    simulation_gas_multipliers: IbcRelayerGasSimulationMultipliers,
48    inner_log_ok: Arc<dyn Fn(String) + Send + Sync + 'static>,
49    inner_log_err: Arc<dyn Fn(String) + Send + Sync + 'static>,
50    client_infos: Vec<Arc<ClientInfo>>,
51}
52
53impl IbcRelayer {
54    pub async fn start(&self) -> Result<()> {
55        // at a high-level, we're streaming events in as they come in and kicking off tasks to handle them
56        let (task_sender, task_receiver) = futures::channel::mpsc::unbounded();
57
58        let resp = futures::future::join(
59            self.produce_tasks(task_sender),
60            self.consume_tasks(task_receiver),
61        )
62        .await;
63
64        match resp {
65            (Ok(()), _) => Ok(()),
66            (Err(e), _) => Err(e),
67        }
68    }
69
70    async fn produce_tasks(
71        &self,
72        task_sender: futures::channel::mpsc::UnboundedSender<Task>,
73    ) -> Result<()> {
74        // get a single stream for each client, regardless of whether it's side one or two
75        let mut unique_clients = HashMap::new();
76
77        for client_info in self.client_infos.iter() {
78            let chain_id_1 = client_info.signing_client_1.chain_id();
79            let chain_id_2 = client_info.signing_client_2.chain_id();
80
81            if !unique_clients.contains_key(chain_id_1) {
82                unique_clients.insert(
83                    chain_id_1.clone(),
84                    client_info.signing_client_1.querier.clone(),
85                );
86            }
87
88            if !unique_clients.contains_key(chain_id_2) {
89                unique_clients.insert(
90                    chain_id_2.clone(),
91                    client_info.signing_client_2.querier.clone(),
92                );
93            }
94        }
95
96        let mut streams = Vec::new();
97
98        for (chain_id, client) in unique_clients.iter() {
99            let stream = Box::pin(
100                client
101                    .clone()
102                    .stream_block_events(None)
103                    .await?
104                    .map(move |events| (chain_id, events)),
105            );
106
107            streams.push(stream);
108        }
109
110        // with all the streams combined, we can now select on them and process each event as it comes in
111        let mut combined_stream = futures::stream::select_all(streams);
112
113        while let Some((chain_id, events)) = combined_stream.next().await {
114            match events {
115                Ok(events) => {
116                    // encapsulate so we can log errors instead of die
117                    match self
118                        .produce_event_tasks(chain_id, events, &task_sender)
119                        .await
120                    {
121                        Ok(()) => {}
122                        Err(e) => {
123                            self.log_err(format!(
124                                "Error processing events for chain {chain_id}: {e:?}"
125                            ));
126                        }
127                    }
128                }
129                Err(e) => {
130                    self.log_err(format!("Error querying chain {chain_id}: {e:?}"));
131                }
132            }
133        }
134
135        Ok(())
136    }
137
138    async fn consume_tasks(
139        &self,
140        mut task_receiver: futures::channel::mpsc::UnboundedReceiver<Task>,
141    ) {
142        while let Some(task) = task_receiver.next().await {
143            // encapsulate so we can log errors instead of die
144            match self.consume_task(task).await {
145                Ok(()) => {}
146                Err(e) => {
147                    self.log_err(format!("Error handling task: {e:?}"));
148                }
149            }
150        }
151    }
152
153    // the main event loop for producing tasks
154    // this should be very quick and not block, as much as possible
155    // heavy lifting is done in the task itself
156    async fn produce_event_tasks(
157        &self,
158        chain_id: &ChainId,
159        block_events: BlockEvents,
160        task_sender: &futures::channel::mpsc::UnboundedSender<Task>,
161    ) -> Result<()> {
162        macro_rules! write_out {
163            ($($arg:tt)*) => {
164                self.log_ok(format!($($arg)*));
165            };
166        }
167
168        let BlockEvents { height, events } = block_events;
169
170        #[allow(clippy::collapsible_if)]
171        for client_info in self.client_infos.iter() {
172            if client_info.signing_client_1.chain_id() == chain_id {
173                if !client_info.update_1.get_is_auto_updating()
174                    && client_info.is_past_update_height(Side::One, height).await?
175                {
176                    client_info.update_1.set_is_auto_updating(true);
177                    task_sender.unbounded_send(Task::AutoUpdateClient {
178                        client_info: client_info.clone(),
179                        side: Side::One,
180                    })?;
181                }
182            } else if client_info.signing_client_2.chain_id() == chain_id {
183                if !client_info.update_2.get_is_auto_updating()
184                    && client_info.is_past_update_height(Side::Two, height).await?
185                {
186                    client_info.update_2.set_is_auto_updating(true);
187                    task_sender.unbounded_send(Task::AutoUpdateClient {
188                        client_info: client_info.clone(),
189                        side: Side::Two,
190                    })?;
191                }
192            }
193        }
194
195        let events = CosmosTxEvents::from(events.as_slice());
196
197        for event in events.events_iter() {
198            match IbcPacket::try_from(&event) {
199                Ok(packet) => {
200                    write_out!("[IBC EVENT] {:?}", packet.kind);
201                    task_sender.unbounded_send(Task::RelayPacket {
202                        client_packet: Box::new(
203                            self.get_client_packet(chain_id, packet)?
204                                .context("couldn't find client info for packet")?,
205                        ),
206                    })?;
207                }
208                Err(_) => {
209                    // non-ibc-event
210                }
211            }
212        }
213        Ok(())
214    }
215
216    // the main workhose for consuming tasks
217    async fn consume_task(&self, task: Task) -> Result<()> {
218        macro_rules! write_out {
219            ($($arg:tt)*) => {
220                self.log_ok(format!($($arg)*));
221            };
222        }
223        match task {
224            Task::AutoUpdateClient { client_info, side } => {
225                self.update_ibc_client(&client_info, side).await?;
226            }
227            Task::RelayPacket { client_packet } => {
228                let ClientPacket {
229                    client_info,
230                    side,
231                    packet,
232                } = *client_packet;
233
234                match packet.kind {
235                    IbcPacketKind::Send | IbcPacketKind::WriteAck => {
236                        // always write to the chain opposite the event source
237                        let (dst_signing_client, src_querier, dst_ibc_client_id, tx_builder) =
238                            match side {
239                                Side::One => (
240                                    client_info.signing_client_2.clone(),
241                                    client_info.signing_client_1.querier.clone(),
242                                    client_info.ibc_client_id_2.clone(),
243                                    client_info
244                                        .tx_builder(Side::Two, &self.simulation_gas_multipliers),
245                                ),
246                                Side::Two => (
247                                    client_info.signing_client_1.clone(),
248                                    client_info.signing_client_2.querier.clone(),
249                                    client_info.ibc_client_id_1.clone(),
250                                    client_info
251                                        .tx_builder(Side::One, &self.simulation_gas_multipliers),
252                                ),
253                            };
254
255                        // not sure why the order matters, change at your own peril
256                        // also, you might think you can skip over updates if it's recent enough.
257                        // maybe, good luck :)
258                        match side {
259                            Side::One => {
260                                self.update_ibc_client(&client_info, Side::One).await?;
261                                self.update_ibc_client(&client_info, Side::Two).await?;
262                            }
263                            Side::Two => {
264                                self.update_ibc_client(&client_info, Side::Two).await?;
265                                self.update_ibc_client(&client_info, Side::One).await?;
266                            }
267                        }
268
269                        if packet.kind == IbcPacketKind::Send {
270                            write_out!(
271                                "[RELAYING PACKET SEND] {}:{} -> {}:{}",
272                                src_querier.chain_config.chain_id,
273                                packet.src_port_id,
274                                dst_signing_client.chain_id(),
275                                packet.dst_port_id
276                            );
277                            dst_signing_client
278                                .ibc_packet_recv(
279                                    &dst_ibc_client_id,
280                                    packet,
281                                    &src_querier,
282                                    Some(tx_builder),
283                                )
284                                .await?;
285                        } else if packet.kind == IbcPacketKind::WriteAck {
286                            write_out!(
287                                "[RELAYING PACKET ACK] {}:{} -> {}:{}",
288                                src_querier.chain_config.chain_id,
289                                packet.src_port_id,
290                                dst_signing_client.chain_id(),
291                                packet.dst_port_id
292                            );
293                            dst_signing_client
294                                .ibc_packet_ack(
295                                    &dst_ibc_client_id,
296                                    packet.clone(),
297                                    &src_querier,
298                                    Some(tx_builder),
299                                )
300                                .await?;
301                        }
302                    }
303                    IbcPacketKind::Ack => {
304                        write_out!(
305                            "[PACKET ACK] CONFIRMED {} <-> {}",
306                            packet.src_port_id,
307                            packet.dst_port_id
308                        );
309                    }
310                    IbcPacketKind::Timeout => {
311                        // TODO - handle timeouts?
312                        write_out!(
313                            "[PACKET TIMEOUT] {} <-> {}",
314                            packet.src_port_id,
315                            packet.dst_port_id
316                        );
317                    }
318                    IbcPacketKind::Receive => {
319                        // TODO - handle receives?
320                        write_out!(
321                            "[PACKET RECEIVE] {} <-> {}",
322                            packet.src_port_id,
323                            packet.dst_port_id
324                        );
325                    }
326                }
327            }
328        }
329        Ok(())
330    }
331
332    async fn update_ibc_client(&self, client_info: &ClientInfo, side: Side) -> Result<()> {
333        let log_ok = self.inner_log_ok.clone();
334        client_info
335            .update(side, &self.simulation_gas_multipliers, move |s| log_ok(s))
336            .await
337    }
338
339    // get the client info for a given chain and packet
340    // also normalizes the packet so that it always points from src->dst
341    // from the perspective of the chain the event was detected on
342    fn get_client_packet(
343        &self,
344        chain_id: &ChainId,
345        mut packet: IbcPacket,
346    ) -> Result<Option<ClientPacket>> {
347        for client_info in self.client_infos.iter() {
348            let side = if chain_id == client_info.signing_client_1.chain_id() {
349                Some(Side::One)
350            } else if chain_id == client_info.signing_client_2.chain_id() {
351                Some(Side::Two)
352            } else {
353                None
354            };
355
356            // at this point, the packet is from the event, and src_connection_id is identical to dst_connection_id, doesn't matter which we check
357            if side.is_none()
358                || (packet.src_connection_id != client_info.connection_id_1
359                    && packet.src_connection_id != client_info.connection_id_2)
360            {
361                continue;
362            }
363
364            let side = side.unwrap();
365
366            for channel in client_info.channels.iter() {
367                if packet.src_channel_id == channel.channel_id_1
368                    && packet.dst_channel_id == channel.channel_id_2
369                    && packet.src_port_id == channel.port_id_1
370                    && packet.dst_port_id == channel.port_id_2
371                {
372                    // normalize the packet
373                    match side {
374                        Side::One => {
375                            packet.src_connection_id = client_info.connection_id_1.clone();
376                            packet.dst_connection_id = client_info.connection_id_2.clone();
377                            // no need to swap channel and port, already in the right order
378                        }
379                        Side::Two => {
380                            packet.src_connection_id = client_info.connection_id_2.clone();
381                            packet.dst_connection_id = client_info.connection_id_1.clone();
382                            std::mem::swap(&mut packet.src_port_id, &mut packet.dst_port_id);
383                            std::mem::swap(&mut packet.src_channel_id, &mut packet.dst_channel_id);
384                        }
385                    }
386                    return Ok(Some(ClientPacket {
387                        client_info: client_info.clone(),
388                        side,
389                        packet,
390                    }));
391                } else if packet.src_channel_id == channel.channel_id_2
392                    && packet.dst_channel_id == channel.channel_id_1
393                    && packet.src_port_id == channel.port_id_2
394                    && packet.dst_port_id == channel.port_id_1
395                {
396                    // normalize the packet
397                    match side {
398                        Side::One => {
399                            packet.src_connection_id = client_info.connection_id_1.clone();
400                            packet.dst_connection_id = client_info.connection_id_2.clone();
401                            std::mem::swap(&mut packet.src_port_id, &mut packet.dst_port_id);
402                            std::mem::swap(&mut packet.src_channel_id, &mut packet.dst_channel_id);
403                        }
404                        Side::Two => {
405                            packet.src_connection_id = client_info.connection_id_2.clone();
406                            packet.dst_connection_id = client_info.connection_id_1.clone();
407                            // no need to swap channel and port, already in the right order
408                        }
409                    }
410                    return Ok(Some(ClientPacket {
411                        client_info: client_info.clone(),
412                        side,
413                        packet,
414                    }));
415                }
416            }
417        }
418        Ok(None)
419    }
420
421    fn log_ok(&self, s: String) {
422        (self.inner_log_ok)(s);
423    }
424
425    fn log_err(&self, s: String) {
426        (self.inner_log_err)(s);
427    }
428}
429
430enum Task {
431    AutoUpdateClient {
432        client_info: Arc<ClientInfo>,
433        side: Side,
434    },
435    RelayPacket {
436        client_packet: Box<ClientPacket>,
437    },
438}
439
440#[derive(Debug, Clone, Copy, Eq, PartialEq)]
441enum Side {
442    One,
443    Two,
444}
445// unique clients for each network
446struct ClientInfo {
447    pub signing_client_1: SigningClient,
448    pub signing_client_2: SigningClient,
449    pub ibc_client_id_1: IbcClientId,
450    pub ibc_client_id_2: IbcClientId,
451    pub trusting_period_1: Duration,
452    pub trusting_period_2: Duration,
453    pub update_1: ClientUpdate,
454    pub update_2: ClientUpdate,
455    pub connection_id_1: IbcConnectionId,
456    pub connection_id_2: IbcConnectionId,
457    pub channels: Vec<ClientInfoChannel>,
458}
459
460#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
461pub struct ClientInfoChannel {
462    pub channel_id_1: IbcChannelId,
463    pub channel_id_2: IbcChannelId,
464    pub port_id_1: IbcPortId,
465    pub port_id_2: IbcPortId,
466    pub channel_version: IbcChannelVersion,
467    pub channel_ordering: IbcChannelOrdering,
468}
469
470impl ClientInfo {
471    fn signing_client(&self, side: Side) -> &SigningClient {
472        match side {
473            Side::One => &self.signing_client_1,
474            Side::Two => &self.signing_client_2,
475        }
476    }
477
478    fn tx_builder(
479        &self,
480        side: Side,
481        simulation_gas_multipliers: &IbcRelayerGasSimulationMultipliers,
482    ) -> TxBuilder<'_> {
483        let mut tx_builder = self.signing_client(side).tx_builder();
484        match side {
485            Side::One => {
486                if let Some(gas_simulation_multiplier) = simulation_gas_multipliers.update_client_1
487                {
488                    tx_builder.set_gas_simulate_multiplier(gas_simulation_multiplier);
489                }
490            }
491            Side::Two => {
492                if let Some(gas_simulation_multiplier) = simulation_gas_multipliers.update_client_2
493                {
494                    tx_builder.set_gas_simulate_multiplier(gas_simulation_multiplier);
495                }
496            }
497        }
498
499        tx_builder
500    }
501
502    fn counterparty_querier(&self, side: Side) -> &QueryClient {
503        match side {
504            Side::One => &self.signing_client_2.querier,
505            Side::Two => &self.signing_client_1.querier,
506        }
507    }
508
509    fn ibc_client_id(&self, side: Side) -> &IbcClientId {
510        match side {
511            Side::One => &self.ibc_client_id_1,
512            Side::Two => &self.ibc_client_id_2,
513        }
514    }
515
516    // 2/3 of trusting period
517    fn stale_duration(&self, side: Side) -> Duration {
518        match side {
519            Side::One => self
520                .trusting_period_1
521                .checked_div(3)
522                .unwrap_or_else(|| Duration::from_secs(0)),
523            Side::Two => self
524                .trusting_period_2
525                .checked_div(3)
526                .unwrap_or_else(|| Duration::from_secs(0)),
527        }
528    }
529
530    async fn set_next_update_time(&self, side: Side) -> Result<()> {
531        let stale_duration = self.stale_duration(side);
532
533        let mut update_time = self
534            .signing_client(side)
535            .querier
536            .block_header(None)
537            .await?
538            .time()
539            .context("No block time found")?;
540        update_time.seconds += i64::try_from(stale_duration.as_secs())?;
541        update_time.nanos += i32::try_from(stale_duration.subsec_nanos())?;
542
543        match side {
544            Side::One => {
545                self.update_1.set_next_time(update_time);
546                self.update_1.set_is_auto_updating(false);
547            }
548            Side::Two => {
549                self.update_2.set_next_time(update_time);
550                self.update_2.set_is_auto_updating(false);
551            }
552        }
553
554        Ok(())
555    }
556
557    async fn is_past_update_height(&self, side: Side, height: u64) -> Result<bool> {
558        let current_time = self
559            .signing_client(side)
560            .querier
561            .block_header(Some(height))
562            .await?
563            .time()
564            .context("No block time found")?;
565        let next_update_time = match side {
566            Side::One => self.update_1.get_next_time(),
567            Side::Two => self.update_2.get_next_time(),
568        };
569        Ok(current_time.seconds > next_update_time.seconds
570            || (current_time.seconds == next_update_time.seconds
571                && current_time.nanos > next_update_time.nanos))
572    }
573
574    async fn update(
575        &self,
576        side: Side,
577        simulation_gas_multipliers: &IbcRelayerGasSimulationMultipliers,
578        log_ok: impl Fn(String) + Send + Sync + 'static,
579    ) -> Result<()> {
580        let client = self.signing_client(side);
581        let ibc_client_id = self.ibc_client_id(side);
582        let counterparty_client_querier = self.counterparty_querier(side);
583
584        let height_before = *client
585            .querier
586            .ibc_client_state(ibc_client_id, None)
587            .await?
588            .latest_height
589            .as_ref()
590            .context("missing latest height")?;
591
592        log_ok(format!(
593            "[CLIENT UPDATE] starting {}:{} -> {} height: {}",
594            client.chain_id(),
595            ibc_client_id,
596            counterparty_client_querier.chain_config.chain_id,
597            height_before.revision_height,
598        ));
599        let tx_builder = self.tx_builder(side, simulation_gas_multipliers);
600        client
601            .ibc_update_client(
602                ibc_client_id,
603                counterparty_client_querier,
604                Some(height_before),
605                Some(tx_builder),
606            )
607            .await?;
608
609        let height_after = *client
610            .querier
611            .ibc_client_state(ibc_client_id, None)
612            .await?
613            .latest_height
614            .as_ref()
615            .context("missing latest height")?;
616
617        log_ok(format!(
618            "[CLIENT UPDATED] {}:{} -> {} height: {}",
619            client.chain_id(),
620            ibc_client_id,
621            counterparty_client_querier.chain_config.chain_id,
622            height_after.revision_height,
623        ));
624
625        self.set_next_update_time(side).await?;
626
627        Ok(())
628    }
629}
630
631#[derive(Default)]
632struct ClientUpdate {
633    pub next_time_seconds: AtomicI64,
634    pub next_time_subnanos: AtomicI32,
635    pub is_auto_updating: AtomicBool,
636}
637
638impl ClientUpdate {
639    pub fn get_next_time(&self) -> layer_climb_proto::Timestamp {
640        layer_climb_proto::Timestamp {
641            seconds: self.next_time_seconds.load(Ordering::SeqCst),
642            nanos: self.next_time_subnanos.load(Ordering::SeqCst),
643        }
644    }
645
646    pub fn set_next_time(&self, time: layer_climb_proto::Timestamp) {
647        self.next_time_seconds.store(time.seconds, Ordering::SeqCst);
648        self.next_time_subnanos.store(time.nanos, Ordering::SeqCst);
649    }
650
651    pub fn get_is_auto_updating(&self) -> bool {
652        self.is_auto_updating.load(Ordering::SeqCst)
653    }
654
655    pub fn set_is_auto_updating(&self, is_updating: bool) {
656        self.is_auto_updating.store(is_updating, Ordering::SeqCst);
657    }
658}
659
660struct ClientPacket {
661    client_info: Arc<ClientInfo>,
662    side: Side,
663    packet: IbcPacket,
664}
665
666#[derive(Serialize, Deserialize, Debug, Clone)]
667pub struct IbcRelayerGasSimulationMultipliers {
668    // if None, IbcConnectionHandshakeGasSimulationMultipliers::default() will be used
669    pub connection_handshake: Option<IbcConnectionHandshakeGasSimulationMultipliers>,
670    // if None, IbcChannelHandshakeGasSimulationMultipliers::default() will be used
671    pub channel_handshake: Option<IbcChannelHandshakeGasSimulationMultipliers>,
672    pub update_client_1: Option<f32>,
673    pub update_client_2: Option<f32>,
674    pub send_packet_1: Option<f32>,
675    pub send_packet_2: Option<f32>,
676}
677
678impl Default for IbcRelayerGasSimulationMultipliers {
679    fn default() -> Self {
680        Self {
681            connection_handshake: Some(IbcConnectionHandshakeGasSimulationMultipliers::default()),
682            channel_handshake: Some(IbcChannelHandshakeGasSimulationMultipliers::default()),
683            update_client_1: Some(2.5),
684            update_client_2: Some(2.5),
685            send_packet_1: Some(2.5),
686            send_packet_2: Some(2.5),
687        }
688    }
689}