1use std::{
10 collections::{HashMap, HashSet},
11 sync::{Arc, Mutex},
12 time::Duration,
13};
14
15use crate::{
16 ibc_types::{
17 IbcChannelId, IbcChannelOrdering, IbcChannelVersion, IbcClientId, IbcConnectionId,
18 IbcPortId,
19 },
20 prelude::*,
21 signing::ibc::{IbcChannelHandshake, IbcConnectionHandshake},
22 transaction::SequenceStrategyKind,
23};
24use futures::{future::Either, pin_mut};
25use serde::{Deserialize, Serialize};
26
27use super::{
28 ClientInfo, ClientInfoChannel, ClientUpdate, IbcRelayer, IbcRelayerGasSimulationMultipliers,
29 Side,
30};
31
32pub struct IbcRelayerBuilder {
33 clients: HashMap<ChainId, SigningClient>,
34 paths: Vec<IbcPath>,
35 simulation_gas_multipliers: IbcRelayerGasSimulationMultipliers,
36 inner_log_ok: Arc<dyn Fn(String) + Send + Sync + 'static>,
37 inner_log_err: Arc<dyn Fn(String) + Send + Sync + 'static>,
38 cache: Arc<Mutex<IbcCache>>,
39 client_infos: Arc<Mutex<HashMap<IbcCacheChainKey, ClientInfo>>>,
40 client_infos_updating: Arc<Mutex<Vec<Arc<ClientInfo>>>>,
41}
42
43#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
44pub struct IbcPath {
45 pub chain_id_1: ChainId,
46 pub chain_id_2: ChainId,
47 pub port_id_1: IbcPortId,
48 pub port_id_2: IbcPortId,
49 pub channel_version: IbcChannelVersion,
50 pub channel_ordering: IbcChannelOrdering,
51}
52
53impl IbcRelayerBuilder {
54 pub fn new(
55 clients: Vec<SigningClient>,
56 mut paths: Vec<IbcPath>,
57 gas_simulation_multipliers: Option<IbcRelayerGasSimulationMultipliers>,
59 log_ok: impl Fn(String) + Send + Sync + 'static,
60 log_err: impl Fn(String) + Send + Sync + 'static,
61 ) -> Self {
62 for path in &mut paths {
65 if path.chain_id_1 > path.chain_id_2 {
66 std::mem::swap(&mut path.chain_id_1, &mut path.chain_id_2);
67 std::mem::swap(&mut path.port_id_1, &mut path.port_id_2);
68 }
69 }
70 let mut found = HashSet::new();
71 paths.retain(|p| found.insert(p.clone()));
72
73 Self {
74 inner_log_ok: Arc::new(log_ok),
75 inner_log_err: Arc::new(log_err),
76 clients: clients
77 .into_iter()
78 .map(|c| (c.chain_id().clone(), c))
79 .collect(),
80 paths,
81 simulation_gas_multipliers: gas_simulation_multipliers.unwrap_or_default(),
82 cache: Arc::new(Mutex::new(IbcCache::default())),
83 client_infos: Arc::new(Mutex::new(HashMap::new())),
84 client_infos_updating: Arc::new(Mutex::new(Vec::new())),
85 }
86 }
87
88 pub async fn build(self) -> Result<IbcRelayer> {
90 let client_infos: Vec<Arc<ClientInfo>> =
91 std::mem::take(&mut *self.client_infos.lock().unwrap())
92 .into_values()
93 .map(Arc::new)
94 .collect();
95
96 if client_infos.is_empty() {
97 return Err(anyhow::anyhow!("No client infos found"));
98 }
99
100 Ok(IbcRelayer {
101 simulation_gas_multipliers: self.simulation_gas_multipliers,
102 inner_log_ok: self.inner_log_ok,
103 inner_log_err: self.inner_log_err,
104 client_infos,
105 })
106 }
107
108 pub async fn prep_cache(&self, initial_cache: Option<IbcCache>) -> Result<IbcCache> {
114 let fut1 = async { self.prep_cache_inner(initial_cache).await };
115 pin_mut!(fut1);
116
117 let fut2 = async { self.check_client_updates_while_prepping().await };
118 pin_mut!(fut2);
119
120 let resp = futures::future::select(fut1, fut2).await;
121
122 match resp {
123 Either::Left((x, _)) => x,
124 Either::Right((y, _)) => {
125 y?;
126 Err(anyhow::anyhow!("unreachable"))
127 }
128 }
129 }
130
131 async fn check_client_updates_while_prepping(&self) -> Result<()> {
132 loop {
133 let client_infos_updating = {
136 let lock = self.client_infos_updating.lock().unwrap();
137 let v = &*lock;
138 v.clone()
139 };
140
141 for client_info in client_infos_updating {
142 if !matches!(
144 client_info.signing_client_1.sequence_strategy_kind(),
145 SequenceStrategyKind::Query
146 ) {
147 let current_height =
148 client_info.signing_client_1.querier.block_height().await?;
149 if client_info
150 .is_past_update_height(Side::One, current_height)
151 .await?
152 {
153 self.update_ibc_client(&client_info, Side::One).await?;
154 }
155 }
156
157 if !matches!(
159 client_info.signing_client_2.sequence_strategy_kind(),
160 SequenceStrategyKind::Query
161 ) {
162 let current_height =
163 client_info.signing_client_2.querier.block_height().await?;
164 if client_info
165 .is_past_update_height(Side::Two, current_height)
166 .await?
167 {
168 self.update_ibc_client(&client_info, Side::Two).await?;
169 }
170 }
171 }
172
173 futures_timer::Delay::new(Duration::from_secs(1)).await;
174 }
175 }
176
177 async fn update_ibc_client(&self, client_info: &ClientInfo, side: Side) -> Result<()> {
178 let log_ok = self.inner_log_ok.clone();
179 client_info
180 .update(side, &self.simulation_gas_multipliers, move |s| log_ok(s))
181 .await
182 }
183
184 async fn prep_cache_inner(&self, initial_cache: Option<IbcCache>) -> Result<IbcCache> {
185 {
186 let mut lock = self.cache.lock().unwrap();
187 if let Some(cache) = initial_cache {
188 *lock = cache;
189 }
190
191 lock.chains.retain(|k, _| {
192 self.clients.contains_key(&k.chain_id_1) && self.clients.contains_key(&k.chain_id_2)
193 })
194 }
195
196 for path in &self.paths {
197 let client_1 = self.get_signing_client(&path.chain_id_1)?;
198 let client_2 = self.get_signing_client(&path.chain_id_2)?;
199 let ibc_chain_cache_key = IbcCacheChainKey {
200 chain_id_1: path.chain_id_1.clone(),
201 chain_id_2: path.chain_id_2.clone(),
202 };
203
204 let mut ibc_client_cache = self.get_ibc_client_cache(&ibc_chain_cache_key);
205 if let Ok(c) = ibc_client_cache.as_ref() {
206 self.log_ok(format!(
207 "client {} for chain {} exists in cache, checking for staleness via update...",
208 c.ibc_client_id_1, path.chain_id_1
209 ));
210 let mut tx_builder = client_1.tx_builder();
211 if let Some(gas_simulation_multiplier) =
212 self.simulation_gas_multipliers.update_client_1
213 {
214 tx_builder.set_gas_simulate_multiplier(gas_simulation_multiplier);
215 }
216 if client_1
217 .ibc_update_client(
218 &c.ibc_client_id_1,
219 &client_2.querier,
220 None,
221 Some(tx_builder),
222 )
223 .await
224 .is_err()
225 {
226 self.log_ok(format!(
227 "client {} for chain {} exists in cache, but is stale",
228 c.ibc_client_id_1, path.chain_id_1
229 ));
230 ibc_client_cache = Err(anyhow::anyhow!("Failed to update clients"));
231 } else {
232 let mut tx_builder = client_2.tx_builder();
233 if let Some(gas_simulation_multiplier) =
234 self.simulation_gas_multipliers.update_client_2
235 {
236 tx_builder.set_gas_simulate_multiplier(gas_simulation_multiplier);
237 }
238 self.log_ok(format!("client {} for chain {} exists in cache, checking for staleness via update...", c.ibc_client_id_2, path.chain_id_2));
239 if client_2
240 .ibc_update_client(
241 &c.ibc_client_id_2,
242 &client_1.querier,
243 None,
244 Some(tx_builder),
245 )
246 .await
247 .is_err()
248 {
249 self.log_ok(format!(
250 "client {} for chain {} exists in cache, but is stale",
251 c.ibc_client_id_2, path.chain_id_2
252 ));
253 ibc_client_cache = Err(anyhow::anyhow!("Failed to update clients"));
254 }
255 }
256 }
257
258 let (conn_handshake, channel_handshake) = match ibc_client_cache {
259 Err(_) => {
260 self.log_ok(format!(
261 "Creating brand new clients for path {} <-> {}",
262 path.chain_id_1, path.chain_id_2
263 ));
264 let conn_handshake = client_1
265 .ibc_connection_handshake(
266 &client_2,
267 None,
268 None,
269 self.simulation_gas_multipliers.connection_handshake.clone(),
270 |s| self.log_ok(s),
271 )
272 .await?;
273
274 {
275 let mut lock = self.cache.lock().unwrap();
276 let mut ibc_connections = HashMap::new();
277 ibc_connections.insert(
278 IbcCacheConnectionKey {
279 connection_id_1: conn_handshake.connection_id.clone(),
280 connection_id_2: conn_handshake.counterparty_connection_id.clone(),
281 },
282 IbcChannelCache {
283 ibc_channels: HashMap::new(),
284 },
285 );
286
287 lock.chains.insert(
288 ibc_chain_cache_key.clone(),
289 IbcClientCache {
290 ibc_client_id_1: conn_handshake.client_id.clone(),
291 ibc_client_id_2: conn_handshake.counterparty_client_id.clone(),
292 ibc_connections,
293 },
294 );
295 }
296
297 anyhow::Ok((conn_handshake, None))
298 }
299 Ok(mut ibc_client_cache) => {
300 self.log_ok(format!(
301 "Clients already exist for path {} <-> {}",
302 path.chain_id_1, path.chain_id_2
303 ));
304 let entry =
305 ibc_client_cache
306 .ibc_connections
307 .iter()
308 .find_map(|(connection_ids, v)| {
309 match v
311 .ibc_channels
312 .get(&IbcCacheChannelKey {
313 port_id_1: path.port_id_1.clone(),
314 port_id_2: path.port_id_2.clone(),
315 })
316 .cloned()
317 {
318 Some((
319 channel_id,
320 counterparty_channel_id,
321 channel_version,
322 )) => {
323 if channel_version == path.channel_version {
324 Some((
325 connection_ids.clone(),
326 IbcChannelHandshake {
327 channel_id,
328 counterparty_channel_id,
329 },
330 ))
331 } else {
332 None
333 }
334 }
335 None => None,
336 }
337 });
338 match entry {
339 None => {
340 let ibc_client_id_1 = ibc_client_cache.ibc_client_id_1.clone();
341 let ibc_client_id_2 = ibc_client_cache.ibc_client_id_2.clone();
342 self.log_ok(format!(
343 "Creating new connection for path {} <-> {} over clients {},{}",
344 path.chain_id_1, path.chain_id_2, ibc_client_id_1, ibc_client_id_2
345 ));
346 let conn_handshake = client_1
347 .ibc_connection_handshake(
348 &client_2,
349 Some(ibc_client_cache.ibc_client_id_1.clone()),
350 Some(ibc_client_cache.ibc_client_id_2.clone()),
351 self.simulation_gas_multipliers.connection_handshake.clone(),
352 |s| self.log_ok(s),
353 )
354 .await?;
355
356 {
357 let mut lock = self.cache.lock().unwrap();
358 ibc_client_cache.ibc_connections.insert(
359 IbcCacheConnectionKey {
360 connection_id_1: conn_handshake.connection_id.clone(),
361 connection_id_2: conn_handshake
362 .counterparty_connection_id
363 .clone(),
364 },
365 IbcChannelCache {
366 ibc_channels: HashMap::new(),
367 },
368 );
369
370 lock.chains
371 .insert(ibc_chain_cache_key.clone(), ibc_client_cache.clone());
372 }
373
374 Ok((conn_handshake, None))
375 }
376 Some((connection_ids, channel_handshake)) => {
377 let conn_handshake = IbcConnectionHandshake {
378 client_id: ibc_client_cache.ibc_client_id_1.clone(),
379 counterparty_client_id: ibc_client_cache.ibc_client_id_2.clone(),
380 connection_id: connection_ids.connection_id_1.clone(),
381 counterparty_connection_id: connection_ids.connection_id_2.clone(),
382 };
383
384 Ok((conn_handshake, Some(channel_handshake)))
385 }
386 }
387 }
388 }?;
389
390 let IbcConnectionHandshake {
391 connection_id: ibc_connection_id_1,
392 counterparty_connection_id: ibc_connection_id_2,
393 client_id: ibc_client_id_1,
394 counterparty_client_id: ibc_client_id_2,
395 } = &conn_handshake;
396
397 let channel_handshake = match channel_handshake {
398 None => {
399 self.log_ok(format!(
400 "Creating channel over connection {}:{} <-> {}:{}, version {}",
401 path.chain_id_1,
402 ibc_connection_id_1,
403 path.chain_id_2,
404 ibc_connection_id_2,
405 path.channel_version
406 ));
407
408 let channel_handshake = client_1
409 .ibc_channel_handshake(
410 &client_2,
411 &path.port_id_1,
412 &path.port_id_2,
413 &path.channel_version,
414 path.channel_ordering,
415 &conn_handshake,
416 self.simulation_gas_multipliers.channel_handshake.clone(),
417 |s| self.log_ok(s),
418 )
419 .await?;
420
421 {
422 let mut lock = self.cache.lock().unwrap();
423 let ibc_client_cache = lock.chains.get_mut(&ibc_chain_cache_key).unwrap();
424 let ibc_connection_cache = ibc_client_cache
425 .ibc_connections
426 .get_mut(&IbcCacheConnectionKey {
427 connection_id_1: ibc_connection_id_1.clone(),
428 connection_id_2: ibc_connection_id_2.clone(),
429 })
430 .unwrap();
431 ibc_connection_cache.ibc_channels.insert(
432 IbcCacheChannelKey {
433 port_id_1: path.port_id_1.clone(),
434 port_id_2: path.port_id_2.clone(),
435 },
436 (
437 channel_handshake.channel_id.clone(),
438 channel_handshake.counterparty_channel_id.clone(),
439 path.channel_version.clone(),
440 ),
441 );
442 }
443
444 channel_handshake
445 }
446 Some(channel_handshake) => {
447 self.log_ok(format!(
448 "Channel already exists over {}:{}:{} <-> {}:{}:{}, version {}",
449 path.chain_id_1,
450 ibc_connection_id_1,
451 channel_handshake.channel_id,
452 path.chain_id_2,
453 ibc_connection_id_2,
454 channel_handshake.counterparty_channel_id,
455 path.channel_version
456 ));
457
458 channel_handshake
459 }
460 };
461
462 {
466 let client_info_channel = ClientInfoChannel {
467 channel_id_1: channel_handshake.channel_id.clone(),
468 channel_id_2: channel_handshake.counterparty_channel_id.clone(),
469 port_id_1: path.port_id_1.clone(),
470 port_id_2: path.port_id_2.clone(),
471 channel_version: path.channel_version.clone(),
472 channel_ordering: path.channel_ordering,
473 };
474
475 let has_client_info = {
476 let mut lock = self.client_infos.lock().unwrap();
477 match lock.get_mut(&ibc_chain_cache_key) {
478 Some(client_info) => {
479 client_info.channels.push(client_info_channel.clone());
482 true
483 }
484 None => false,
485 }
486 };
487
488 if !has_client_info {
489 let client_state_1 = client_1
490 .querier
491 .ibc_client_state(ibc_client_id_1, None)
492 .await?;
493 let client_state_2 = client_2
494 .querier
495 .ibc_client_state(ibc_client_id_2, None)
496 .await?;
497
498 let trusting_period_1 = client_state_1
499 .trusting_period
500 .context("No trusting period found")?;
501 let trusting_period_2 = client_state_2
502 .trusting_period
503 .context("No trusting period found")?;
504
505 let client_info = ClientInfo {
506 signing_client_1: client_1,
507 signing_client_2: client_2,
508 ibc_client_id_1: ibc_client_id_1.clone(),
509 ibc_client_id_2: ibc_client_id_2.clone(),
510 trusting_period_1: Duration::new(
511 trusting_period_1.seconds as u64,
512 trusting_period_1.nanos as u32,
513 ),
514 trusting_period_2: Duration::new(
515 trusting_period_2.seconds as u64,
516 trusting_period_2.nanos as u32,
517 ),
518 update_1: ClientUpdate::default(),
519 update_2: ClientUpdate::default(),
520 connection_id_1: ibc_connection_id_1.clone(),
521 connection_id_2: ibc_connection_id_2.clone(),
522 channels: vec![client_info_channel],
523 };
524
525 {
526 let mut lock = self.client_infos_updating.lock().unwrap();
527 let client_info_for_update = ClientInfo {
530 signing_client_1: client_info.signing_client_1.clone(),
531 signing_client_2: client_info.signing_client_2.clone(),
532 ibc_client_id_1: client_info.ibc_client_id_1.clone(),
533 ibc_client_id_2: client_info.ibc_client_id_2.clone(),
534 trusting_period_1: client_info.trusting_period_1,
535 trusting_period_2: client_info.trusting_period_2,
536 update_1: ClientUpdate::default(),
537 update_2: ClientUpdate::default(),
538 connection_id_1: client_info.connection_id_1.clone(),
539 connection_id_2: client_info.connection_id_2.clone(),
540 channels: client_info.channels.clone(),
541 };
542
543 lock.push(Arc::new(client_info_for_update));
544 }
545
546 {
547 let mut lock = self.client_infos.lock().unwrap();
548 lock.insert(ibc_chain_cache_key.clone(), client_info);
549 }
550 }
551 };
552 }
553
554 Ok(self.cache.lock().unwrap().clone())
555 }
556
557 fn get_signing_client(&self, chain_id: &ChainId) -> Result<SigningClient> {
558 self.clients
559 .get(chain_id)
560 .cloned()
561 .ok_or_else(|| anyhow::anyhow!("No signing client found for chain {}", chain_id))
562 }
563
564 fn get_ibc_client_cache(&self, chain_key: &IbcCacheChainKey) -> Result<IbcClientCache> {
565 let cache = self.cache.lock().unwrap();
566 cache.chains.get(chain_key).cloned().ok_or_else(|| {
567 anyhow::anyhow!(
568 "No chain cache found for chains {} and {}",
569 chain_key.chain_id_1,
570 chain_key.chain_id_2
571 )
572 })
573 }
574
575 fn log_ok(&self, s: String) {
576 (self.inner_log_ok)(s);
577 }
578}
579
580#[derive(Serialize, Deserialize, Debug, Default, Clone)]
581pub struct IbcCache {
582 pub chains: HashMap<IbcCacheChainKey, IbcClientCache>,
583}
584
585#[derive(Clone, PartialEq, Eq, Debug, Hash)]
586pub struct IbcCacheChainKey {
587 pub chain_id_1: ChainId,
588 pub chain_id_2: ChainId,
589}
590
591#[derive(Serialize, Deserialize, Debug, Clone)]
592pub struct IbcClientCache {
593 pub ibc_client_id_1: IbcClientId,
594 pub ibc_client_id_2: IbcClientId,
595 pub ibc_connections: HashMap<IbcCacheConnectionKey, IbcChannelCache>,
596}
597
598#[derive(Clone, PartialEq, Eq, Debug, Hash)]
599pub struct IbcCacheConnectionKey {
600 pub connection_id_1: IbcConnectionId,
601 pub connection_id_2: IbcConnectionId,
602}
603
604#[derive(Serialize, Deserialize, Debug, Clone)]
605pub struct IbcChannelCache {
606 pub ibc_channels: HashMap<IbcCacheChannelKey, (IbcChannelId, IbcChannelId, IbcChannelVersion)>,
607}
608
609#[derive(Clone, PartialEq, Eq, Debug, Hash)]
610pub struct IbcCacheChannelKey {
611 pub port_id_1: IbcPortId,
612 pub port_id_2: IbcPortId,
613}
614impl Serialize for IbcCacheChainKey {
616 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
617 where
618 S: serde::Serializer,
619 {
620 serializer.serialize_str(&format!("{}|{}", self.chain_id_1, self.chain_id_2))
621 }
622}
623
624impl<'de> Deserialize<'de> for IbcCacheChainKey {
625 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
626 where
627 D: serde::Deserializer<'de>,
628 {
629 let s = String::deserialize(deserializer)?;
630 let parts: Vec<&str> = s.split('|').collect();
631 if parts.len() != 2 {
632 return Err(serde::de::Error::custom("invalid IbcCacheClientKey format"));
633 }
634 Ok(IbcCacheChainKey {
635 chain_id_1: ChainId::new(parts[0].to_string()),
636 chain_id_2: ChainId::new(parts[1].to_string()),
637 })
638 }
639}
640
641impl Serialize for IbcCacheConnectionKey {
642 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
643 where
644 S: serde::Serializer,
645 {
646 serializer.serialize_str(&format!(
647 "{}|{}",
648 self.connection_id_1, self.connection_id_2
649 ))
650 }
651}
652
653impl<'de> Deserialize<'de> for IbcCacheConnectionKey {
654 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
655 where
656 D: serde::Deserializer<'de>,
657 {
658 let s = String::deserialize(deserializer)?;
659 let parts: Vec<&str> = s.split('|').collect();
660 if parts.len() != 2 {
661 return Err(serde::de::Error::custom(
662 "invalid IbcCacheConnectionKey format",
663 ));
664 }
665 Ok(IbcCacheConnectionKey {
666 connection_id_1: IbcConnectionId::new(parts[0].to_string()),
667 connection_id_2: IbcConnectionId::new(parts[1].to_string()),
668 })
669 }
670}
671
672impl Serialize for IbcCacheChannelKey {
673 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
674 where
675 S: serde::Serializer,
676 {
677 serializer.serialize_str(&format!("{}|{}", self.port_id_1, self.port_id_2))
678 }
679}
680
681impl<'de> Deserialize<'de> for IbcCacheChannelKey {
682 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
683 where
684 D: serde::Deserializer<'de>,
685 {
686 let s = String::deserialize(deserializer)?;
687 let parts: Vec<&str> = s.split('|').collect();
688 if parts.len() != 2 {
689 return Err(serde::de::Error::custom(
690 "invalid IbcCacheChannelKey format",
691 ));
692 }
693 Ok(IbcCacheChannelKey {
694 port_id_1: IbcPortId::new(parts[0].to_string()),
695 port_id_2: IbcPortId::new(parts[1].to_string()),
696 })
697 }
698}