mod error;
use corepc_node::Client;
use std::{
fs::File,
io::{self, BufRead, BufReader, Read, Write},
net::TcpListener,
path::{Path, PathBuf},
process::{Child, Command, Stdio},
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
mpsc::{self, Receiver},
Arc,
},
thread::{self, sleep},
time::{Duration, Instant},
};
use temp_dir::TempDir;
pub use error::Error;
#[derive(Debug, PartialEq, Eq, Clone)]
#[non_exhaustive]
pub struct Conf<'a> {
pub args: Vec<&'a str>,
attempts: u8,
pub ip: Option<String>,
pub port: Option<u16>,
pub binary: Option<String>,
}
impl Default for Conf<'_> {
fn default() -> Self {
Self {
args: Vec::new(),
attempts: 5,
ip: None,
port: None,
binary: None,
}
}
}
pub fn get_available_port() -> Result<u16, Error> {
let t = TcpListener::bind(("127.0.0.1", 0))?; Ok(t.local_addr().map(|s| s.port())?)
}
pub struct BlindbitD {
pub process: Child,
pub work_dir: TempDir,
pub logs: Receiver<String>,
pub port: u16,
pub addr: String,
pub binary: PathBuf,
pub bitcoin: corepc_node::Node,
}
fn try_read_line<R: BufRead>(reader: &mut R) -> io::Result<Option<String>> {
let mut buffer = Vec::new();
match reader.read_until(b'\n', &mut buffer)? {
0 => Ok(None), _ => {
if let Ok(line) = String::from_utf8(buffer) {
Ok(Some(line))
} else {
Ok(None)
}
}
}
}
impl BlindbitD {
pub fn new() -> Result<BlindbitD, Error> {
BlindbitD::with_conf(&Conf::default())
}
pub fn with_conf(conf: &Conf) -> Result<BlindbitD, Error> {
let mut args = conf.args.clone();
let ip = conf.ip.clone().unwrap_or("127.0.0.1".into());
let port = conf.port.unwrap_or(get_available_port()?);
let file_path = file!();
let mut bin_dir = Path::new(file_path)
.parent()
.unwrap()
.parent()
.unwrap()
.to_path_buf();
bin_dir.push("bin");
bin_dir.push("blindbit_bcd562f");
let bin = if let Some(bin) = conf.binary.clone() {
bin
} else if let Some(bin) = &bin_dir.to_str() {
bin.to_string()
} else {
panic!("no valid binary path")
};
let exe = Path::new(&bin);
if !exe.exists() {
panic!("path {:?} does not exists!", exe);
}
if !exe.is_file() {
panic!(" path {:?} is not a file!", exe);
}
let work_dir = TempDir::with_prefix("blindbit_").unwrap();
let bitcoind = corepc_node::Node::from_downloaded().unwrap();
let bitcoind_addr = bitcoind.params.rpc_socket;
let bitcoind_cookie = bitcoind.params.cookie_file.clone().canonicalize().unwrap();
let config_path = work_dir.child("blindbit.toml");
let mut file = File::create(config_path.clone())?;
writeln!(&file, "host = \"{ip}:{port}\"").unwrap();
writeln!(file, "chain = \"regtest\"").unwrap();
writeln!(file, "rpc_endpoint = \"http://{bitcoind_addr}\"").unwrap();
writeln!(
file,
"cookie_path = \"{}\"",
bitcoind_cookie.to_str().unwrap()
)
.unwrap();
writeln!(file, "sync_start_height = 1").unwrap();
writeln!(file, "max_parallel_tweak_computations = 4").unwrap();
writeln!(file, "max_parallel_requests = 4").unwrap();
writeln!(file, "tweaks_only = 0").unwrap();
writeln!(file, "tweaks_full_basic = 1").unwrap();
writeln!(file, "tweaks_full_with_dust_filter = 0").unwrap();
writeln!(file, "tweaks_cut_through_with_dust_filter = 0").unwrap();
drop(file);
let mut file = File::open(config_path).unwrap();
let mut content = String::new();
file.read_to_string(&mut content).unwrap();
args.push("--datadir");
let cfg_path = work_dir.path();
let path = cfg_path.to_str().expect("hardcoded");
args.push(path);
let (sender, logs) = mpsc::channel();
let mut p = None;
#[allow(clippy::never_loop)]
'f: for _ in 0..conf.attempts {
let mut process = Command::new(exe)
.args(args.clone())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let timeout = Instant::now() + Duration::from_secs(3);
let stdout = process.stdout.take().unwrap();
let mut stdout_reader = BufReader::new(stdout);
let s = sender.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop2 = stop.clone();
thread::spawn(move || loop {
if let Ok(Some(line)) = try_read_line(&mut stdout_reader) {
let _ = s.send(line);
} else if stop2.load(Relaxed) {
break;
}
});
loop {
if Instant::now() > timeout {
let _ = process.kill();
stop.store(true, Relaxed);
return Err(Error::Start);
} else if let Ok(log) = logs.try_recv() {
if log.contains("Listening and serving HTTP") {
p = Some(process);
break 'f;
} else {
sleep(Duration::from_millis(10));
}
}
}
}
let mut process = if let Some(p) = p {
p
} else {
panic!("Fail to start BlindbitD after {} attempts", conf.attempts);
};
let stderr = process.stderr.take().unwrap();
thread::spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines() {
sender.send(line.unwrap()).unwrap();
}
});
Ok(BlindbitD {
process,
work_dir,
logs,
addr: ip.clone(),
port,
binary: exe.to_path_buf(),
bitcoin: bitcoind,
})
}
pub fn workdir(&self) -> PathBuf {
self.work_dir.path().to_path_buf()
}
pub fn kill(&mut self) -> Result<(), Error> {
self.inner_kill()?;
match self.process.wait() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
pub fn clear_logs(&mut self) {
while self.logs.try_recv().is_ok() {}
}
fn inner_kill(&mut self) -> Result<(), Error> {
Ok(nix::sys::signal::kill(
nix::unistd::Pid::from_raw(self.process.id() as i32),
nix::sys::signal::SIGINT,
)?)
}
pub fn url(&self) -> String {
format!("http://{}:{}", self.addr, self.port)
}
pub fn bitcoin(&mut self) -> &mut Client {
&mut self.bitcoin.client
}
}
impl Drop for BlindbitD {
fn drop(&mut self) {
let _ = self.kill();
}
}