use crate::job_declaration_protocol::{
error::BitcoinCoreSv2JDPError, io::JdRequest, mempool::MempoolMirror,
};
use async_channel::Receiver;
use bitcoin_capnp_types::{
init_capnp::init::Client as InitIpcClient,
mining_capnp::{
block_template::Client as BlockTemplateIpcClient, mining::Client as MiningIpcClient,
},
proxy_capnp::{thread::Client as ThreadIpcClient, thread_map::Client as ThreadMapIpcClient},
};
use capnp_rpc::{RpcSystem, rpc_twoparty_capnp, twoparty};
use std::{cell::RefCell, path::Path, rc::Rc};
use stratum_core::bitcoin::{Block, consensus::deserialize};
use tokio::net::UnixStream;
use tokio_util::compat::*;
pub use tokio_util::sync::CancellationToken;
use tracing::info;
pub mod error;
mod handlers;
pub mod io;
mod mempool;
mod monitors;
#[derive(Clone)]
pub struct BitcoinCoreSv2JDP {
thread_map: ThreadMapIpcClient,
thread_ipc_client: ThreadIpcClient,
mining_ipc_client: MiningIpcClient,
current_template_ipc_client: Rc<RefCell<BlockTemplateIpcClient>>,
cancellation_token: CancellationToken,
mempool_mirror: Rc<RefCell<MempoolMirror>>,
incoming_requests: Receiver<JdRequest>,
}
impl BitcoinCoreSv2JDP {
pub async fn new<P>(
bitcoin_core_unix_socket_path: P,
incoming_requests: Receiver<JdRequest>,
cancellation_token: CancellationToken,
ready_tx: tokio::sync::oneshot::Sender<()>,
) -> Result<Self, BitcoinCoreSv2JDPError>
where
P: AsRef<Path>,
{
let bitcoin_core_unix_socket_path = bitcoin_core_unix_socket_path.as_ref();
info!(
"Creating new BitcoinCoreSv2JDP via IPC over UNIX socket: {}",
bitcoin_core_unix_socket_path.display()
);
let stream = UnixStream::connect(bitcoin_core_unix_socket_path)
.await
.map_err(|e| {
BitcoinCoreSv2JDPError::CannotConnectToUnixSocket(
bitcoin_core_unix_socket_path.into(),
e.to_string(),
)
})?;
let (reader, writer) = stream.into_split();
let reader_compat = reader.compat();
let writer_compat = writer.compat_write();
let rpc_network = Box::new(twoparty::VatNetwork::new(
reader_compat,
writer_compat,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let bootstrap_client: InitIpcClient =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);
let construct_response = bootstrap_client.construct_request().send().promise.await?;
let thread_map: ThreadMapIpcClient = construct_response.get()?.get_thread_map()?;
let thread_request = thread_map.make_thread_request();
let thread_response = thread_request.send().promise.await?;
let thread_ipc_client: ThreadIpcClient = thread_response.get()?.get_result()?;
info!("IPC execution thread client successfully created.");
let mut mining_client_request = bootstrap_client.make_mining_request();
mining_client_request
.get()
.get_context()?
.set_thread(thread_ipc_client.clone());
let mining_client_response = mining_client_request.send().promise.await?;
let mining_ipc_client: MiningIpcClient = mining_client_response.get()?.get_result()?;
let mut template_ipc_client_request = mining_ipc_client.create_new_block_request();
template_ipc_client_request
.get()
.get_context()?
.set_thread(thread_ipc_client.clone());
let mut template_ipc_client_request_options = template_ipc_client_request
.get()
.get_options()
.map_err(|e| {
tracing::error!("Failed to get template IPC client request options: {e}");
e
})?;
template_ipc_client_request_options.set_use_mempool(true);
tracing::debug!("Sending createNewBlock request to Bitcoin Core");
let create_new_block_promise = template_ipc_client_request.send().promise;
let template_ipc_client_response = tokio::select! {
template_ipc_client_response = create_new_block_promise => {
template_ipc_client_response.map_err(|e| {
tracing::error!("Failed to send template IPC client request: {}", e);
e
})?
}
_ = cancellation_token.cancelled() => {
tracing::debug!("Interrupting initial createNewBlock request");
Self::interrupt_create_new_block_request(&mining_ipc_client).await?;
return Err(capnp::Error::failed(
"createNewBlock request interrupted during shutdown".to_string(),
)
.into());
}
};
let template_ipc_client_result = template_ipc_client_response.get().map_err(|e| {
tracing::error!("Failed to get template IPC client result: {}", e);
e
})?;
let template_ipc_client = template_ipc_client_result.get_result().map_err(|e| {
tracing::error!("Failed to get template IPC client result: {}", e);
e
})?;
info!("IPC JDP client successfully created.");
let self_ = Self {
thread_map,
thread_ipc_client,
mining_ipc_client,
current_template_ipc_client: Rc::new(RefCell::new(template_ipc_client)),
cancellation_token,
mempool_mirror: Rc::new(RefCell::new(MempoolMirror::new())),
incoming_requests,
};
tracing::debug!("Bootstrapping initial mempool state");
if let Err(e) = self_.update_mempool_mirror().await {
tracing::error!("Failed to bootstrap mempool mirror: {:?}", e);
return Err(e);
}
tracing::debug!("Initial mempool state bootstrapped successfully");
ready_tx.send(()).map_err(|_| {
tracing::error!("Ready signal receiver dropped - caller gave up waiting");
BitcoinCoreSv2JDPError::ReadinessSignalFailed
})?;
Ok(self_)
}
async fn new_thread_ipc_client(&self) -> Result<ThreadIpcClient, BitcoinCoreSv2JDPError> {
let thread_request = self.thread_map.make_thread_request();
let thread_response = thread_request.send().promise.await.map_err(|e| {
let details = format!("Failed to send make_thread request: {}", e);
tracing::error!("{}", details);
BitcoinCoreSv2JDPError::FailedToCreateThreadIpcClient(details)
})?;
let thread_ipc_client = thread_response
.get()
.map_err(|e| {
let details = format!("Failed to read make_thread response: {}", e);
tracing::error!("{}", details);
BitcoinCoreSv2JDPError::FailedToCreateThreadIpcClient(details)
})?
.get_result()
.map_err(|e| {
let details = format!("Failed to get thread IPC client: {}", e);
tracing::error!("{}", details);
BitcoinCoreSv2JDPError::FailedToCreateThreadIpcClient(details)
})?;
Ok(thread_ipc_client)
}
async fn interrupt_create_new_block_request(
mining_ipc_client: &MiningIpcClient,
) -> Result<(), BitcoinCoreSv2JDPError> {
let interrupt_request = mining_ipc_client.interrupt_request();
if let Err(e) = interrupt_request.send().promise.await {
tracing::error!("Failed to send interrupt createNewBlock request: {}", e);
return Err(BitcoinCoreSv2JDPError::CapnpError(e));
}
Ok(())
}
pub async fn run(&self) {
let monitor_handle = self.monitor_and_update_mempool_mirror();
loop {
tokio::select! {
_ = self.cancellation_token.cancelled() => {
tracing::info!("BitcoinCoreSv2JDP shutting down");
break;
}
request = self.incoming_requests.recv() => {
match request {
Ok(request) => {
self.process_request(request).await;
}
Err(_) => {
tracing::info!("Incoming requests channel closed");
self.cancellation_token.cancel();
break;
}
}
}
}
}
tracing::debug!("Waiting for monitor_mempool_mirror() task to finish");
match monitor_handle.await {
Ok(()) => {
tracing::debug!("monitor_mempool_mirror() task finished successfully");
}
Err(e) => {
tracing::error!(
"error waiting for monitor_mempool_mirror task to finish: {:?}",
e
);
}
}
}
async fn update_mempool_mirror(&self) -> Result<(), BitcoinCoreSv2JDPError> {
let mut get_block_request = self
.current_template_ipc_client
.borrow()
.get_block_request();
get_block_request
.get()
.get_context()?
.set_thread(self.thread_ipc_client.clone());
let block_bytes = get_block_request
.send()
.promise
.await?
.get()?
.get_result()?
.to_vec();
tracing::debug!("Deserializing block ({} bytes)", block_bytes.len());
let block: Block =
deserialize(&block_bytes).map_err(BitcoinCoreSv2JDPError::FailedToDeserializeBlock)?;
self.mempool_mirror.borrow_mut().update(&block);
Ok(())
}
pub(crate) async fn force_update_mempool_mirror(&self) -> Result<(), BitcoinCoreSv2JDPError> {
const MAX_ATTEMPTS: usize = 3;
const RETRY_BACKOFF_MS: u64 = 25;
let mut last_error: Option<BitcoinCoreSv2JDPError> = None;
for attempt in 1..=MAX_ATTEMPTS {
let result = async {
let mut create_new_block_request =
self.mining_ipc_client.create_new_block_request();
create_new_block_request
.get()
.get_context()
.map_err(|e| {
tracing::error!("Failed to get template IPC client request context: {e}");
e
})?
.set_thread(self.thread_ipc_client.clone());
let mut create_new_block_options =
create_new_block_request.get().get_options().map_err(|e| {
tracing::error!("Failed to get createNewBlock options: {e}");
e
})?;
create_new_block_options.set_use_mempool(true);
let create_new_block_response =
create_new_block_request.send().promise.await.map_err(|e| {
tracing::error!("Failed to send createNewBlock request: {e}");
e
})?;
let new_template_ipc_client = create_new_block_response
.get()
.map_err(|e| {
tracing::error!("Failed to read createNewBlock response: {e}");
e
})?
.get_result()
.map_err(|e| {
tracing::error!("Failed to get BlockTemplate from createNewBlock: {e}");
e
})?;
{
let mut current_template_ipc_client =
self.current_template_ipc_client.borrow_mut();
*current_template_ipc_client = new_template_ipc_client;
}
self.update_mempool_mirror().await
}
.await;
match result {
Ok(()) => return Ok(()),
Err(e) if e.is_thread_busy() && attempt < MAX_ATTEMPTS => {
tracing::warn!(
error = ?e,
attempt,
max_attempts = MAX_ATTEMPTS,
"Transient IPC contention during force_update_mempool_mirror (thread busy); retrying"
);
last_error = Some(e);
tokio::time::sleep(std::time::Duration::from_millis(RETRY_BACKOFF_MS)).await;
}
Err(e) => return Err(e),
}
}
Err(last_error.unwrap_or_else(|| {
BitcoinCoreSv2JDPError::CapnpError(capnp::Error::failed(
"force_update_mempool_mirror exhausted retries without a terminal error"
.to_string(),
))
}))
}
async fn process_request(&self, request: JdRequest) {
match request {
JdRequest::DeclareMiningJob {
version,
coinbase_tx,
wtxid_list,
missing_txs,
response_tx,
} => {
self.handle_declare_mining_job(
version,
coinbase_tx,
wtxid_list,
missing_txs,
response_tx,
)
.await;
}
JdRequest::PushSolution { push_solution } => {
self.handle_push_solution(push_solution).await;
}
}
}
async fn interrupt_wait_request(&self) -> Result<(), BitcoinCoreSv2JDPError> {
let template_ipc_client = self.current_template_ipc_client.borrow().clone();
let interrupt_wait_request = template_ipc_client.interrupt_wait_request();
if let Err(e) = interrupt_wait_request.send().promise.await {
tracing::error!("Failed to send interrupt wait request: {}", e);
return Err(BitcoinCoreSv2JDPError::CapnpError(e));
}
Ok(())
}
}