use alloy::network::EthereumWallet;
use alloy::primitives::Address;
use alloy::providers::ProviderBuilder;
use alloy::signers::local::PrivateKeySigner;
use blueprint_keystore::Keystore;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
pub type KeeperResult<T> = Result<T, KeeperError>;
#[derive(Error, Debug)]
pub enum KeeperError {
#[error("Configuration error: {0}")]
Config(String),
#[error("Keystore error: {0}")]
Keystore(#[from] blueprint_keystore::Error),
#[error("Transaction error: {0}")]
Transaction(String),
#[error("Contract error: {0}")]
Contract(String),
#[error("Provider error: {0}")]
Provider(String),
}
pub struct KeeperHandle {
pub handle: JoinHandle<KeeperResult<()>>,
pub name: &'static str,
}
impl KeeperHandle {
pub async fn join(self) -> KeeperResult<()> {
self.handle
.await
.map_err(|e| KeeperError::Config(format!("Keeper {} panicked: {}", self.name, e)))?
}
}
#[derive(Clone)]
pub struct KeeperConfig {
pub http_rpc_endpoint: String,
pub keystore: Arc<Keystore>,
pub inflation_pool: Option<Address>,
pub multi_asset_delegation: Option<Address>,
pub streaming_payment_manager: Option<Address>,
pub epoch_check_interval: Duration,
pub round_check_interval: Duration,
pub stream_check_interval: Duration,
pub monitored_operators: Vec<Address>,
pub tangle_contract: Option<Address>,
pub billing_check_interval: Duration,
pub billing_rescan_interval: Duration,
pub billing_max_batch_size: usize,
}
impl KeeperConfig {
pub fn new(http_rpc_endpoint: String, keystore: Arc<Keystore>) -> Self {
Self {
http_rpc_endpoint,
keystore,
inflation_pool: None,
multi_asset_delegation: None,
streaming_payment_manager: None,
epoch_check_interval: Duration::from_secs(300), round_check_interval: Duration::from_secs(60), stream_check_interval: Duration::from_secs(600), monitored_operators: Vec::new(),
tangle_contract: None,
billing_check_interval: Duration::from_secs(60), billing_rescan_interval: Duration::from_secs(300), billing_max_batch_size: 50,
}
}
pub fn with_inflation_pool(mut self, address: Address) -> Self {
self.inflation_pool = Some(address);
self
}
pub fn with_multi_asset_delegation(mut self, address: Address) -> Self {
self.multi_asset_delegation = Some(address);
self
}
pub fn with_streaming_payment_manager(mut self, address: Address) -> Self {
self.streaming_payment_manager = Some(address);
self
}
pub fn with_epoch_interval(mut self, interval: Duration) -> Self {
self.epoch_check_interval = interval;
self
}
pub fn with_round_interval(mut self, interval: Duration) -> Self {
self.round_check_interval = interval;
self
}
pub fn with_stream_interval(mut self, interval: Duration) -> Self {
self.stream_check_interval = interval;
self
}
pub fn with_monitored_operators(mut self, operators: Vec<Address>) -> Self {
self.monitored_operators = operators;
self
}
pub fn with_tangle_contract(mut self, address: Address) -> Self {
self.tangle_contract = Some(address);
self
}
pub fn with_billing_interval(mut self, interval: Duration) -> Self {
self.billing_check_interval = interval;
self
}
pub fn with_billing_rescan_interval(mut self, interval: Duration) -> Self {
self.billing_rescan_interval = interval;
self
}
pub fn with_billing_max_batch_size(mut self, size: usize) -> Self {
self.billing_max_batch_size = size;
self
}
pub fn get_signer(&self) -> KeeperResult<PrivateKeySigner> {
use blueprint_crypto::BytesEncoding;
use blueprint_keystore::backends::Backend;
use blueprint_keystore::crypto::k256::K256Ecdsa;
let ecdsa_public = self
.keystore
.as_ref()
.first_local::<K256Ecdsa>()
.map_err(KeeperError::Keystore)?;
let ecdsa_secret = self
.keystore
.as_ref()
.get_secret::<K256Ecdsa>(&ecdsa_public)
.map_err(KeeperError::Keystore)?;
let private_key = alloy::primitives::hex::encode(ecdsa_secret.to_bytes());
private_key
.parse()
.map_err(|e| KeeperError::Config(format!("Invalid private key: {}", e)))
}
pub fn get_operator_address(&self) -> KeeperResult<Address> {
let signer = self.get_signer()?;
Ok(signer.address())
}
pub async fn get_provider(&self) -> KeeperResult<impl alloy::providers::Provider + Clone> {
let signer = self.get_signer()?;
let wallet = EthereumWallet::from(signer);
ProviderBuilder::new()
.wallet(wallet)
.connect(&self.http_rpc_endpoint)
.await
.map_err(|e| KeeperError::Provider(e.to_string()))
}
pub async fn get_read_provider(&self) -> KeeperResult<impl alloy::providers::Provider + Clone> {
ProviderBuilder::new()
.connect(&self.http_rpc_endpoint)
.await
.map_err(|e| KeeperError::Provider(e.to_string()))
}
}
pub trait BackgroundKeeper: Sized {
const NAME: &'static str;
fn start(config: KeeperConfig, shutdown: broadcast::Receiver<()>) -> KeeperHandle;
fn check_and_execute(
config: &KeeperConfig,
) -> impl std::future::Future<Output = KeeperResult<bool>> + Send;
}