use core::net::SocketAddr;
use core::net::SocketAddrV4;
use std::env;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::process::Child;
use std::process::Command;
use std::process::Stdio;
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
use corepc_client::bitcoin::BlockHash;
use corepc_client::bitcoin::Script;
use corepc_client::bitcoin::Txid;
use electrum_client::ElectrumApi;
use electrum_client::Error as ElectrumError;
use electrum_client::HeaderNotification;
use electrum_client::raw_client::ElectrumPlaintextStream;
use electrum_client::raw_client::RawClient;
use tracing::debug;
use crate::BitcoinD;
use crate::DataDir;
use crate::Error;
use crate::IPV4_LOCALHOST;
use crate::POLL_INTERVAL;
use crate::SPAWN_ATTEMPTS;
use crate::SPAWN_INTERVAL;
use crate::get_available_port;
use crate::node::Node;
use crate::pipe_to_tracing;
mod versions;
pub const ELECTRS_INDEXING_TIMEOUT: Duration = Duration::from_secs(30);
pub fn get_electrs_path() -> Result<PathBuf, Error> {
#[allow(unused_mut)]
let mut bin_path = PathBuf::from(option_env!("HALFIN_ELECTRS_PATH").unwrap_or(""));
#[cfg(target_os = "windows")]
if bin_path.extension().is_none() {
bin_path.set_extension("exe");
}
let bin_name = ElectrsD::get_bin_name().to_string();
match bin_path.exists() {
true => Ok(bin_path),
false => Err(Error::BinaryNotFound((bin_name, bin_path))),
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ElectrsDConf<'a> {
pub args: Vec<&'a str>,
pub network: &'a str,
pub tmpdir: Option<PathBuf>,
pub staticdir: Option<PathBuf>,
pub max_retries: u8,
}
impl Default for ElectrsDConf<'_> {
fn default() -> Self {
ElectrsDConf {
args: vec![],
network: "regtest",
tmpdir: None,
staticdir: None,
max_retries: SPAWN_ATTEMPTS,
}
}
}
#[derive(Debug)]
pub struct ElectrsD {
process: Child,
pub client: RawClient<ElectrumPlaintextStream>,
working_directory: DataDir,
electrum_socket: SocketAddr,
monitoring_socket: SocketAddr,
}
#[rustfmt::skip]
impl ElectrsD {
pub fn get_name() -> &'static str { versions::ELECTRS_NAME }
pub fn get_bin_name() -> &'static str { versions::ELECTRS_BIN_NAME }
}
impl ElectrsD {
pub fn new(bitcoind: &BitcoinD) -> Result<Self, Error> {
Self::from_bin(get_electrs_path()?, bitcoind)
}
pub fn new_with_conf(bitcoind: &BitcoinD, conf: &ElectrsDConf) -> Result<Self, Error> {
Self::from_bin_with_conf(get_electrs_path()?, bitcoind, conf)
}
pub fn from_bin<P: AsRef<Path>>(electrs_bin: P, bitcoind: &BitcoinD) -> Result<Self, Error> {
Self::from_bin_with_conf(electrs_bin, bitcoind, &ElectrsDConf::default())
}
pub fn from_bin_with_conf<P: AsRef<Path>>(
electrs_bin: P,
bitcoind: &BitcoinD,
conf: &ElectrsDConf,
) -> Result<Self, Error> {
let electrs_bin = electrs_bin.as_ref();
if !electrs_bin.is_absolute() {
return Err(Error::BinaryPathNotAbsolute {
bin_name: Self::get_bin_name().to_string(),
path: electrs_bin.display().to_string(),
});
}
if !electrs_bin.is_file() {
return Err(Error::BinaryPathNotFile {
bin_name: Self::get_bin_name().to_string(),
path: electrs_bin.display().to_string(),
});
}
Self::ensure_bitcoind_ready(bitcoind)?;
for _attempt in 0..conf.max_retries {
let working_directory = Self::init_work_dir(conf)?;
let electrum_port = get_available_port();
let electrum_socket = SocketAddr::V4(SocketAddrV4::new(IPV4_LOCALHOST, electrum_port));
let monitoring_port = get_available_port();
let monitoring_socket =
SocketAddr::V4(SocketAddrV4::new(IPV4_LOCALHOST, monitoring_port));
let mut args: Vec<String> = conf.args.iter().map(ToString::to_string).collect();
args.extend([
"--db-dir".to_string(),
working_directory.path().display().to_string(),
"--network".to_string(),
conf.network.to_string(),
"--daemon-rpc-addr".to_string(),
bitcoind.rpc_socket().to_string(),
"--daemon-p2p-addr".to_string(),
bitcoind.get_p2p_socket().to_string(),
"--electrum-rpc-addr".to_string(),
electrum_socket.to_string(),
"--monitoring-addr".to_string(),
monitoring_socket.to_string(),
"--cookie-file".to_string(),
bitcoind.cookie_file().display().to_string(),
]);
debug!(
"Spawning {} [ELECTRUM_SOCKET={}, MONITORING_SOCKET={}, DATADIR={}]",
Self::get_name(),
electrum_socket,
monitoring_socket,
working_directory.path().display()
);
let mut process = Command::new(electrs_bin)
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(Error::FailedToSpawn)?;
if let Some(stdout) = process.stdout.take() {
pipe_to_tracing(stdout, "electrs");
}
if let Some(stderr) = process.stderr.take() {
pipe_to_tracing(stderr, "electrs");
}
sleep(SPAWN_INTERVAL);
match process.try_wait() {
Ok(Some(_)) | Err(_) => {
debug!(
"{} exited immediately, retrying with fresh ports",
Self::get_name()
);
let _ = process.kill();
continue;
}
Ok(None) => {}
}
if let Ok(client) =
Self::wait_for_client(electrum_socket, &mut process, Duration::from_secs(10))
{
sleep(Duration::from_millis(200));
debug!(
"Started {} [PID={}, ELECTRUM_SOCKET={}, MONITORING_SOCKET={}, DATADIR={}]",
Self::get_name(),
process.id(),
electrum_socket,
monitoring_socket,
working_directory.path().display()
);
return Ok(Self {
process,
client,
working_directory,
electrum_socket,
monitoring_socket,
});
}
let _ = process.kill();
}
Err(Error::ExhaustedNodeBuildingAttempts(conf.max_retries))
}
#[cfg(not(target_os = "windows"))]
pub fn trigger(&self) -> Result<(), Error> {
debug!(
"{}: triggering rescan pid={}",
Self::get_name(),
self.process.id()
);
let status = Command::new("kill")
.arg("-USR1")
.arg(self.process.id().to_string())
.status()
.map_err(Error::Io)?;
if status.success() {
debug!("{}: triggered rescan", Self::get_name());
Ok(())
} else {
Err(Error::UnexpectedResponse(format!(
"failed to trigger electrs rescan with exit status={status}"
)))
}
}
#[cfg(target_os = "windows")]
pub fn trigger(&self) -> Result<(), Error> {
debug!(
"{}: skipped rescan trigger on Windows",
ElectrsD::get_name()
);
Ok(())
}
pub fn stop(&mut self) -> Result<std::process::ExitStatus, Error> {
debug!("Stopping {} [PID={}]", Self::get_name(), self.process.id());
let _ = self.process.kill();
self.process.wait().map_err(Error::Io)
}
pub fn get_pid(&self) -> u32 {
let pid = self.process.id();
debug!("{}: got pid={}", Self::get_name(), pid);
pid
}
pub fn get_working_directory(&self) -> PathBuf {
let working_directory = self.working_directory.path();
debug!(
"{}: got working directory at path={}",
Self::get_name(),
working_directory.display()
);
working_directory
}
pub fn get_electrum_client(&self) -> &RawClient<ElectrumPlaintextStream> {
debug!(
"{}: got electrum client for socket={}",
Self::get_name(),
self.electrum_socket
);
&self.client
}
pub fn electrum_socket(&self) -> SocketAddr {
debug!(
"{}: got electrum socket at socket={}",
Self::get_name(),
self.electrum_socket
);
self.electrum_socket
}
pub fn electrum_url(&self) -> String {
let electrum_url = self.electrum_socket.to_string();
debug!(
"{}: got electrum url at url={}",
Self::get_name(),
electrum_url
);
electrum_url
}
pub fn monitoring_socket(&self) -> SocketAddr {
debug!(
"{}: got monitoring socket at socket={}",
Self::get_name(),
self.monitoring_socket
);
self.monitoring_socket
}
pub fn wait_until_caught_up(
&self,
bitcoind: &BitcoinD,
timeout: Option<Duration>,
) -> Result<(), Error> {
let height = bitcoind.get_chain_tip()?;
let hash = bitcoind.get_block_hash(height)?;
debug!(
"{}: waiting until caught up height={} hash={}",
Self::get_name(),
height,
hash
);
self.wait_until_block(height, Some(hash), timeout)
}
pub fn wait_until_tip(
&self,
exp_height: u32,
exp_hash: BlockHash,
timeout: Option<Duration>,
) -> Result<(), Error> {
debug!(
"{}: waiting until tip height={} hash={}",
Self::get_name(),
exp_height,
exp_hash
);
self.wait_until_block(exp_height, Some(exp_hash), timeout)
}
pub fn wait_until_mempool_tx(
&self,
spk: &Script,
txid: Txid,
timeout: Option<Duration>,
) -> Result<(), Error> {
debug!(
"{}: waiting until mempool transaction txid={}",
Self::get_name(),
txid
);
let (subscribed, initial_status) = match self.client.script_subscribe(spk) {
Ok(status) => (true, status),
Err(ElectrumError::AlreadySubscribed(_)) => (false, None),
Err(err) => return Err(Error::UnresponsiveElectrsD(err)),
};
let timeout = timeout.unwrap_or(ELECTRS_INDEXING_TIMEOUT);
let result = (|| {
if initial_status.is_some() && self.script_history_has_mempool_tx(spk, txid)? {
debug!(
"{}: found mempool transaction with txid={}",
Self::get_name(),
txid
);
return Ok(());
}
let start = Instant::now();
while start.elapsed() < timeout {
self.trigger()?;
self.client.ping().map_err(Error::UnresponsiveElectrsD)?;
if self
.client
.script_pop(spk)
.map_err(Error::UnresponsiveElectrsD)?
.is_some()
&& self.script_history_has_mempool_tx(spk, txid)?
{
debug!(
"{}: found mempool transaction with txid={}",
Self::get_name(),
txid
);
return Ok(());
}
sleep(2 * POLL_INTERVAL);
}
Err(Error::ElectrsDIndexTimeout((
format!("mempool transaction with txid={txid}"),
timeout,
)))
})();
if subscribed {
let _ = self.client.script_unsubscribe(spk);
}
result
}
fn script_history_has_mempool_tx(&self, spk: &Script, txid: Txid) -> Result<bool, Error> {
self.client
.script_get_history(spk)
.map(|history| {
let has_tx = history
.iter()
.any(|entry| entry.tx_hash == txid && entry.height == 0);
debug!(
"{}: checked script mempool transaction with txid={} found={}",
Self::get_name(),
txid,
has_tx
);
has_tx
})
.map_err(Error::UnresponsiveElectrsD)
}
fn wait_until_block(
&self,
exp_height: u32,
exp_hash: Option<BlockHash>,
timeout: Option<Duration>,
) -> Result<(), Error> {
let client = self.get_electrum_client();
let mut next_notification = Some(
client
.block_headers_subscribe()
.map_err(Error::UnresponsiveElectrsD)?,
);
let description = match exp_hash {
Some(hash) => format!("block {exp_height} ({hash})"),
None => format!("block {exp_height}"),
};
let timeout = timeout.unwrap_or(ELECTRS_INDEXING_TIMEOUT);
debug!(
"{}: waiting until indexed {} timeout={:?}",
Self::get_name(),
description,
timeout
);
let start = Instant::now();
while start.elapsed() < timeout {
self.trigger()?;
client.ping().map_err(Error::UnresponsiveElectrsD)?;
let notification = match next_notification.take() {
Some(notification) => Some(notification),
None => client
.block_headers_pop()
.map_err(Error::UnresponsiveElectrsD)?,
};
let Some(notification) = notification else {
sleep(2 * POLL_INTERVAL);
continue;
};
if electrs_header_matches(client, ¬ification, exp_height, exp_hash)? {
debug!("{}: finished indexing {}", Self::get_name(), description);
return Ok(());
}
sleep(2 * POLL_INTERVAL);
}
Err(Error::ElectrsDIndexTimeout((description, timeout)))
}
fn ensure_bitcoind_ready(bitcoind: &BitcoinD) -> Result<(), Error> {
let blockchain_info = bitcoind.call("getblockchaininfo", &[])?;
let initial_block_download = blockchain_info
.get("initialblockdownload")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
debug!(
"{}: checked backing bitcoind readiness initial_block_download={}",
Self::get_name(),
initial_block_download
);
if initial_block_download {
let _ = bitcoind.generate(1)?;
}
Ok(())
}
fn init_work_dir(conf: &ElectrsDConf) -> Result<DataDir, Error> {
let tmpdir = conf
.tmpdir
.clone()
.or_else(|| env::var("TEMPDIR_ROOT").map(PathBuf::from).ok());
let work_dir = match (&tmpdir, &conf.staticdir) {
(Some(_), Some(_)) => return Err(Error::BothDirsSpecified),
(None, Some(workdir)) => {
fs::create_dir_all(workdir).map_err(Error::Io)?;
DataDir::Persistent(workdir.to_owned())
}
(Some(tmpdir), None) => DataDir::Temporary(
tempfile::Builder::new()
.prefix("halfin-electrs-")
.tempdir_in(tmpdir)
.map_err(Error::Io)?,
),
(None, None) => DataDir::Temporary(
tempfile::Builder::new()
.prefix("halfin-electrs-")
.tempdir()
.map_err(Error::Io)?,
),
};
Ok(work_dir)
}
fn wait_for_client(
electrum_socket: SocketAddr,
process: &mut Child,
timeout: Duration,
) -> Result<RawClient<ElectrumPlaintextStream>, Error> {
let start = Instant::now();
let mut last_error = None;
while start.elapsed() < timeout {
match process.try_wait() {
Ok(Some(_)) | Err(_) => {
return Err(Error::RpcClientSetupTimeout);
}
Ok(None) => {}
}
match RawClient::new(electrum_socket, Some(Duration::from_millis(500)), None) {
Ok(client) => match client.ping() {
Ok(()) => return Ok(client),
Err(err) => last_error = Some(err),
},
Err(err) => last_error = Some(err),
}
sleep(Duration::from_millis(200));
}
Err(last_error.map_or(Error::RpcClientSetupTimeout, Error::UnresponsiveElectrsD))
}
}
impl Drop for ElectrsD {
fn drop(&mut self) {
debug!(
"{}: killing process with pid={}",
Self::get_name(),
self.process.id()
);
let _ = self.process.kill();
}
}
fn electrs_header_matches(
client: &RawClient<ElectrumPlaintextStream>,
notification: &HeaderNotification,
exp_height: u32,
exp_hash: Option<BlockHash>,
) -> Result<bool, Error> {
let notification_height = u32::try_from(notification.height)
.map_err(|err| Error::UnexpectedResponse(err.to_string()))?;
if notification_height < exp_height {
return Ok(false);
}
let header = if notification_height == exp_height {
notification.header
} else {
client
.block_header(
usize::try_from(exp_height)
.map_err(|err| Error::UnexpectedResponse(err.to_string()))?,
)
.map_err(Error::UnresponsiveElectrsD)?
};
Ok(exp_hash.is_none_or(|exp_hash| header.block_hash() == exp_hash))
}