#[cfg(feature = "gossip-bootstrap")]
use super::*;
#[cfg(feature = "gossip-bootstrap")]
use tokio::sync::oneshot;
#[cfg(feature = "gossip-bootstrap")]
#[derive(Debug)]
pub(crate) enum RepairCommand {
Request {
request: MissingShredRequest,
},
NoteShredSources {
sources: Vec<(SocketAddr, u16)>,
},
HandleResponsePing {
packet: Arc<[u8]>,
from_addr: SocketAddr,
},
HandleServeRequest {
packet: Arc<[u8]>,
from_addr: SocketAddr,
},
}
#[cfg(feature = "gossip-bootstrap")]
#[derive(Debug)]
pub(crate) enum RepairOutcome {
RequestSent {
peer_addr: SocketAddr,
},
RequestNoPeer {
request: MissingShredRequest,
},
RequestError {
request: MissingShredRequest,
error: String,
},
ResponsePingHandledFrom {
source: SocketAddr,
},
ResponsePingError {
source: SocketAddr,
error: String,
},
ServeRequestHandled {
source: SocketAddr,
request: crate::repair::ServedRepairRequest,
},
ServeRequestError {
source: SocketAddr,
error: String,
},
}
#[cfg(feature = "gossip-bootstrap")]
#[derive(Debug, thiserror::Error)]
pub(crate) enum RepairDriverStartError {
#[error("failed to spawn dedicated repair thread: {source}")]
SpawnThread { source: std::io::Error },
}
#[cfg(feature = "gossip-bootstrap")]
type RepairDriverParts = (
mpsc::Sender<RepairCommand>,
mpsc::Receiver<RepairOutcome>,
ArcShift<crate::repair::RepairPeerSnapshot>,
RepairDriverHandle,
);
#[cfg(feature = "gossip-bootstrap")]
#[derive(Debug)]
pub(crate) struct RepairDriverHandle {
shutdown_tx: Option<oneshot::Sender<()>>,
join_handle: Option<std::thread::JoinHandle<()>>,
}
#[cfg(feature = "gossip-bootstrap")]
impl RepairDriverHandle {
#[expect(
clippy::missing_const_for_fn,
reason = "contains non-const std thread handle storage"
)]
fn new(shutdown_tx: oneshot::Sender<()>, join_handle: std::thread::JoinHandle<()>) -> Self {
Self {
shutdown_tx: Some(shutdown_tx),
join_handle: Some(join_handle),
}
}
fn abort(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take()
&& shutdown_tx.send(()).is_err()
{
}
drop(self.join_handle.take());
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take()
&& shutdown_tx.send(()).is_err()
{
}
if let Some(join_handle) = self.join_handle.take() {
drop(
tokio::task::spawn_blocking(move || {
drop(join_handle.join());
})
.await,
);
}
}
}
#[cfg(feature = "gossip-bootstrap")]
#[derive(Debug)]
pub(crate) struct RepairSourceHintBuffer {
counts: HashMap<SocketAddr, u16>,
capacity: usize,
}
#[cfg(feature = "gossip-bootstrap")]
impl RepairSourceHintBuffer {
pub(crate) fn new(capacity: usize) -> Self {
Self {
counts: HashMap::new(),
capacity: capacity.max(1),
}
}
pub(crate) fn len(&self) -> usize {
self.counts.len()
}
pub(crate) fn record(&mut self, source_addr: SocketAddr) -> Result<(), ()> {
if let Some(entry) = self.counts.get_mut(&source_addr) {
*entry = entry.saturating_add(1);
return Ok(());
}
if self.counts.len() >= self.capacity {
return Err(());
}
let _ = self.counts.insert(source_addr, 1);
Ok(())
}
fn drain_batch(&mut self, max_batch_size: usize) -> Vec<(SocketAddr, u16)> {
if self.counts.is_empty() {
return Vec::new();
}
let mut entries: Vec<_> = self.counts.drain().collect();
entries.sort_unstable_by(|(_, left_hits), (_, right_hits)| right_hits.cmp(left_hits));
let max_batch_size = max_batch_size.max(1);
if entries.len() <= max_batch_size {
return entries;
}
let remaining = entries.split_off(max_batch_size);
for (addr, hits) in remaining {
let _ = self.counts.insert(addr, hits);
}
entries
}
}
#[cfg(feature = "gossip-bootstrap")]
pub(crate) fn flush_repair_source_hints(
source_hints: &mut RepairSourceHintBuffer,
command_tx: Option<&mpsc::Sender<RepairCommand>>,
batch_size: usize,
source_hint_drops: &mut u64,
source_hint_enqueued: &mut u64,
) {
if source_hints.len() == 0 {
return;
}
let batch = source_hints.drain_batch(batch_size);
if batch.is_empty() {
return;
}
if let Some(command_tx) = command_tx {
let batch_len = u64::try_from(batch.len()).unwrap_or(u64::MAX);
match command_tx.try_send(RepairCommand::NoteShredSources { sources: batch }) {
Ok(()) => {
*source_hint_enqueued = source_hint_enqueued.saturating_add(batch_len);
}
Err(_) => {
*source_hint_drops = source_hint_drops.saturating_add(batch_len);
}
}
} else {
let batch_len = u64::try_from(batch.len()).unwrap_or(u64::MAX);
*source_hint_drops = source_hint_drops.saturating_add(batch_len);
}
}
#[cfg(feature = "gossip-bootstrap")]
pub(crate) fn spawn_repair_driver(
mut repair_client: crate::repair::GossipRepairClient,
relay_cache: Option<SharedRelayCache>,
) -> Result<RepairDriverParts, RepairDriverStartError> {
let (command_tx, mut command_rx) =
mpsc::channel::<RepairCommand>(read_repair_command_queue_capacity());
let (result_tx, result_rx) =
mpsc::channel::<RepairOutcome>(read_repair_result_queue_capacity());
let peer_snapshot = repair_client.peer_snapshot_handle();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
let driver_thread = std::thread::Builder::new()
.name(String::from("sof-repair-driver"))
.spawn(move || {
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(runtime) => runtime,
Err(source) => {
tracing::error!(
?source,
"failed to build dedicated repair runtime"
);
return;
}
};
runtime.block_on(async move {
let mut slot_hint = 0_u64;
let mut refresh_tick = interval(Duration::from_millis(read_repair_peer_refresh_ms()));
refresh_tick.tick().await;
let _ = repair_client.refresh_peer_snapshot(slot_hint);
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
maybe_command = command_rx.recv() => {
let Some(command) = maybe_command else {
break;
};
match command {
RepairCommand::Request { request } => {
slot_hint = slot_hint.max(request.slot);
let outcome = match repair_client
.request_missing_shred(request.slot, request.index, request.kind)
.await
{
Ok(Some(peer_addr)) => RepairOutcome::RequestSent { peer_addr },
Ok(None) => RepairOutcome::RequestNoPeer { request },
Err(error) => RepairOutcome::RequestError {
request,
error: error.to_string(),
},
};
if result_tx.try_send(outcome).is_err() {
}
}
RepairCommand::NoteShredSources { sources } => {
let _ = repair_client.note_shred_sources(&sources);
}
RepairCommand::HandleResponsePing { packet, from_addr } => {
match repair_client
.maybe_handle_response_ping(packet.as_ref(), from_addr)
.await
{
Ok(true) => {
if result_tx
.try_send(RepairOutcome::ResponsePingHandledFrom {
source: from_addr,
})
.is_err()
{
}
}
Ok(false) => {}
Err(error) => {
if result_tx
.try_send(RepairOutcome::ResponsePingError {
source: from_addr,
error: error.to_string(),
})
.is_err()
{
}
}
}
}
RepairCommand::HandleServeRequest { packet, from_addr } => {
match repair_client
.maybe_serve_repair_request(
packet.as_ref(),
from_addr,
relay_cache.as_ref(),
)
.await
{
Ok(Some(request)) => {
if result_tx
.try_send(RepairOutcome::ServeRequestHandled {
source: from_addr,
request,
})
.is_err()
{
}
}
Ok(None) => {}
Err(error) => {
if result_tx
.try_send(RepairOutcome::ServeRequestError {
source: from_addr,
error: error.to_string(),
})
.is_err()
{
}
}
}
}
}
}
_ = refresh_tick.tick() => {
let _ = repair_client.refresh_peer_snapshot(slot_hint);
}
}
}
});
})
.map_err(|source| RepairDriverStartError::SpawnThread { source })?;
let driver_handle = RepairDriverHandle::new(shutdown_tx, driver_thread);
Ok((command_tx, result_rx, peer_snapshot, driver_handle))
}
#[cfg(feature = "gossip-bootstrap")]
pub(crate) fn replace_repair_driver(
repair_client: crate::repair::GossipRepairClient,
relay_cache: Option<SharedRelayCache>,
repair_command_tx: &mut Option<mpsc::Sender<RepairCommand>>,
repair_result_rx: &mut Option<mpsc::Receiver<RepairOutcome>>,
repair_peer_snapshot: &mut Option<ArcShift<crate::repair::RepairPeerSnapshot>>,
repair_driver_handle: &mut Option<RepairDriverHandle>,
) {
*repair_command_tx = None;
*repair_result_rx = None;
*repair_peer_snapshot = None;
if let Some(handle) = repair_driver_handle.take() {
handle.abort();
}
match spawn_repair_driver(repair_client, relay_cache) {
Ok((command_tx, result_rx, peer_snapshot, driver_handle)) => {
*repair_command_tx = Some(command_tx);
*repair_result_rx = Some(result_rx);
*repair_peer_snapshot = Some(peer_snapshot);
*repair_driver_handle = Some(driver_handle);
}
Err(error) => {
tracing::error!(?error, "failed to replace repair driver");
}
}
}
#[cfg(feature = "gossip-bootstrap")]
pub(crate) async fn stop_repair_driver(
repair_command_tx: &mut Option<mpsc::Sender<RepairCommand>>,
repair_result_rx: &mut Option<mpsc::Receiver<RepairOutcome>>,
repair_peer_snapshot: &mut Option<ArcShift<crate::repair::RepairPeerSnapshot>>,
repair_driver_handle: &mut Option<RepairDriverHandle>,
) {
*repair_command_tx = None;
*repair_result_rx = None;
*repair_peer_snapshot = None;
if let Some(handle) = repair_driver_handle.take() {
handle.shutdown().await;
}
}