webb-relayer 0.4.1

The Webb Relayer toolkit
Documentation
// Copyright 2022 Webb Technologies Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#![warn(missing_docs)]

//! # Relayer Context Module 🕸️
//!
//! A module for managing the context of the relayer.
use std::convert::TryFrom;
use std::time::Duration;

use tokio::sync::broadcast;
use webb::evm::ethers::core::k256::SecretKey;
use webb::evm::ethers::prelude::*;
use webb::substrate::subxt;
use webb::substrate::subxt::sp_core::sr25519::Pair as Sr25519Pair;

use crate::config;
/// RelayerContext contains Relayer's configuration and shutdown signal.
#[derive(Clone)]
pub struct RelayerContext {
    /// The configuration of the relayer.
    pub config: config::WebbRelayerConfig,
    /// Broadcasts a shutdown signal to all active connections.
    ///
    /// The initial `shutdown` trigger is provided by the `run` caller. The
    /// server is responsible for gracefully shutting down active connections.
    /// When a connection task is spawned, it is passed a broadcast receiver
    /// handle. When a graceful shutdown is initiated, a `()` value is sent via
    /// the broadcast::Sender. Each active connection receives it, reaches a
    /// safe terminal state, and completes the task.
    notify_shutdown: broadcast::Sender<()>,
}

impl RelayerContext {
    /// Creates a new RelayerContext.
    pub fn new(config: config::WebbRelayerConfig) -> Self {
        let (notify_shutdown, _) = broadcast::channel(2);
        Self {
            config,
            notify_shutdown,
        }
    }
    /// Returns a broadcast receiver handle for the shutdown signal.
    pub fn shutdown_signal(&self) -> Shutdown {
        Shutdown::new(self.notify_shutdown.subscribe())
    }
    /// Sends a shutdown signal to all subscribed tasks/connections.
    pub fn shutdown(&self) {
        let _ = self.notify_shutdown.send(());
    }
    /// Returns a new `EthereumProvider` for the relayer.
    ///
    /// # Arguments
    ///
    /// * `chain_id` - A string representing the chain id.
    ///
    /// # Examples
    ///
    /// ```
    /// let chain_name = "mainnet".to_string();
    /// let provider = ctx.evm_provider(chain_name).await?;
    /// ```
    pub async fn evm_provider(
        &self,
        chain_id: &str,
    ) -> crate::Result<Provider<Http>> {
        let chain_config = self.config.evm.get(chain_id).ok_or_else(|| {
            crate::Error::ChainNotFound {
                chain_id: chain_id.to_string(),
            }
        })?;
        let provider = Provider::try_from(chain_config.http_endpoint.as_str())?
            .interval(Duration::from_millis(5u64));
        Ok(provider)
    }
    /// Sets up and returns an EVM wallet for the relayer.
    ///
    /// # Arguments
    ///
    /// * `chain_id` - A string representing the chain id.
    ///
    /// # Examples
    ///
    /// ```
    /// let chain_name = "mainnet".to_string();
    /// let wallet = self.ctx.evm_wallet(&self.chain_name).await?;
    /// ```
    pub async fn evm_wallet(
        &self,
        chain_name: &str,
    ) -> crate::Result<LocalWallet> {
        let chain_config =
            self.config.evm.get(chain_name).ok_or_else(|| {
                crate::Error::ChainNotFound {
                    chain_id: chain_name.to_string(),
                }
            })?;
        let private_key = chain_config
            .private_key
            .as_ref()
            .ok_or(crate::Error::MissingSecrets)?;
        let key = SecretKey::from_be_bytes(private_key.as_bytes())?;
        let chain_id = chain_config.chain_id;
        let wallet = LocalWallet::from(key).with_chain_id(chain_id);
        Ok(wallet)
    }
    /// Sets up and returns a Substrate client for the relayer.
    ///
    /// # Arguments
    ///
    /// * `chain_id` - A string representing the chain ID.
    ///
    /// # Examples
    ///
    /// ```
    /// let chain_id = "4".to_string();
    /// let client = ctx.substrate_provider::<subxt::DefaultConfig>(chain_id).await?;
    /// ```
    pub async fn substrate_provider<C: subxt::Config>(
        &self,
        chain_id: &str,
    ) -> crate::Result<subxt::Client<C>> {
        let node_config =
            self.config.substrate.get(chain_id).ok_or_else(|| {
                crate::Error::NodeNotFound {
                    chain_id: chain_id.to_string(),
                }
            })?;
        let client = subxt::ClientBuilder::new()
            .set_url(node_config.ws_endpoint.as_str())
            .build()
            .await?;
        Ok(client)
    }
    /// Sets up and returns a Substrate wallet for the relayer.
    ///
    /// # Arguments
    ///
    /// * `chain_id` - A string representing the chain ID.
    ///
    /// # Examples
    ///
    /// ```
    /// let chain_id = "4".to_string();
    /// let pair = ctx.substrate_wallet(chain_id).await?;
    /// ```
    pub async fn substrate_wallet(
        &self,
        chain_id: &str,
    ) -> crate::Result<Sr25519Pair> {
        let node_config = self
            .config
            .substrate
            .get(chain_id)
            .cloned()
            .ok_or_else(|| crate::Error::NodeNotFound {
                chain_id: chain_id.to_string(),
            })?;
        let suri_key = node_config.suri.ok_or(crate::Error::MissingSecrets)?;
        Ok(suri_key.into())
    }
}

/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub struct Shutdown {
    /// `true` if the shutdown signal has been received
    shutdown: bool,

    /// The receive half of the channel used to listen for shutdown.
    notify: broadcast::Receiver<()>,
}

impl Shutdown {
    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
    pub fn new(notify: broadcast::Receiver<()>) -> Shutdown {
        Shutdown {
            shutdown: false,
            notify,
        }
    }

    /// Receive the shutdown notice, waiting if necessary.
    pub async fn recv(&mut self) {
        // If the shutdown signal has already been received, then return
        // immediately.
        if self.shutdown {
            return;
        }

        // Cannot receive a "lag error" as only one value is ever sent.
        let _ = self.notify.recv().await;

        // Remember that the signal has been received.
        self.shutdown = true;
    }
}