use crate::grpc::dfdaemon_upload::DfdaemonUploadClient;
use crate::resource::piece_collector::CollectedParent;
use dashmap::DashMap;
use dragonfly_api::common::v2::{Network, Peer, PersistentCachePeer, PersistentPeer};
use dragonfly_api::dfdaemon::v2::SyncHostRequest;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use dragonfly_client_util::id_generator::IDGenerator;
use dragonfly_client_util::net::format_url;
use dragonfly_client_util::shutdown::{self, Shutdown};
use rand::distr::weighted::WeightedIndex;
use rand::distr::Distribution;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::{debug, error, instrument, warn, Instrument};
const DEFAULT_NETWORK_WEIGHT: u64 = 10_000_000_000;
struct Connection {
active_requests: Arc<AtomicUsize>,
shutdown: Shutdown,
}
impl Connection {
pub fn new() -> Self {
Self {
active_requests: Arc::new(AtomicUsize::new(0)),
shutdown: Shutdown::new(),
}
}
pub fn active_requests(&self) -> usize {
self.active_requests.load(Ordering::SeqCst)
}
pub fn increment_request(&self) {
self.active_requests.fetch_add(1, Ordering::SeqCst);
}
pub fn decrement_request(&self) {
self.active_requests.fetch_sub(1, Ordering::SeqCst);
}
pub fn shutdown(&self) {
self.shutdown.trigger();
}
}
pub struct ParentSelector {
config: Arc<Config>,
id_generator: Arc<IDGenerator>,
weights: Arc<DashMap<String, u64>>,
connections: Arc<DashMap<String, Connection>>,
shutdown: shutdown::Shutdown,
_shutdown_complete: mpsc::UnboundedSender<()>,
}
impl ParentSelector {
#[instrument(skip_all)]
pub fn new(
config: Arc<Config>,
id_generator: Arc<IDGenerator>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> ParentSelector {
Self {
config,
id_generator,
weights: Arc::new(DashMap::new()),
connections: Arc::new(DashMap::new()),
shutdown,
_shutdown_complete: shutdown_complete_tx,
}
}
#[instrument(skip_all)]
pub fn select(&self, parents: Vec<CollectedParent>) -> CollectedParent {
let weights: Vec<u64> = parents
.iter()
.map(|parent| {
let Some(parent_host) = parent.host.as_ref() else {
warn!(
"parent {} has no host info, defaulting weight to 0",
parent.id
);
return 0;
};
let parent_host_id = parent_host.id.clone();
self.weights
.get(&parent_host_id)
.map(|w| *w)
.unwrap_or_else(|| {
debug!(
"no weight info for parent {} {}, defaulting weight to 0",
parent.id, parent_host_id
);
0
})
})
.collect();
match WeightedIndex::new(weights) {
Ok(dist) => {
let mut rng = rand::rng();
let index = dist.sample(&mut rng);
let selected_parent = &parents[index];
selected_parent.clone()
}
Err(_) => parents[fastrand::usize(..parents.len())].clone(),
}
}
#[instrument(skip_all)]
pub async fn register(&self, parents: &[Peer]) -> Result<()> {
let dfdaemon_shutdown = self.shutdown.clone();
let mut join_set = JoinSet::new();
for parent in parents {
debug!("register parent {}", parent.id);
let Some(parent_host) = parent.host.as_ref() else {
error!("parent {} has no host info, skipping", parent.id);
continue;
};
let parent_host_id = parent_host.id.clone();
if let Some(connection) = self.connections.get(&parent_host_id) {
connection.increment_request();
continue;
}
let dfdaemon_upload_client = match DfdaemonUploadClient::new(
self.config.clone(),
format_url(
"http",
IpAddr::from_str(&parent_host.ip)?,
parent_host.port as u16,
),
false,
)
.await
{
Ok(client) => client,
Err(err) => {
error!("failed to connect to parent {}: {}", parent_host_id, err);
continue;
}
};
let weight = match parent_host.network.as_ref() {
Some(network) => Self::calculate_weight_by_network(network),
None => DEFAULT_NETWORK_WEIGHT,
};
self.weights.entry(parent_host_id.clone()).or_insert(weight);
match self.connections.entry(parent_host_id.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
entry.get().increment_request();
continue;
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
let connection = Connection::new();
connection.increment_request();
let shutdown = connection.shutdown.clone();
entry.insert(connection);
let weights = self.weights.clone();
let host_id = self.id_generator.host_id();
let peer_id = self.id_generator.peer_id();
let dfdaemon_shutdown_clone = dfdaemon_shutdown.clone();
join_set.spawn(
Self::sync_host(
host_id,
peer_id,
parent_host_id.clone(),
weights,
dfdaemon_upload_client,
shutdown,
dfdaemon_shutdown_clone,
)
.in_current_span(),
);
}
}
}
tokio::spawn(async move {
while let Some(message) = join_set.join_next().await {
match message {
Ok(Ok(_)) => debug!("sync host info completed"),
Ok(Err(err)) => error!("sync host info failed: {}", err),
Err(err) => error!("task join error: {}", err),
}
}
});
Ok(())
}
#[instrument(skip_all)]
pub fn unregister(&self, parents: &[Peer]) {
for parent in parents {
debug!("unregister parent {}", parent.id);
let Some(parent_host) = parent.host.as_ref() else {
warn!("parent {} has no host info, skipping", parent.id);
continue;
};
let parent_host_id = parent_host.id.clone();
if let Some(connection) = self.connections.get(&parent_host_id) {
connection.decrement_request();
if connection.active_requests() == 0 {
debug!("cleaning up parent {} connection", parent_host_id);
connection.shutdown();
drop(connection);
self.weights.remove(&parent_host_id);
self.connections.remove(&parent_host_id);
}
}
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn sync_host(
host_id: String,
peer_id: String,
parent_host_id: String,
weights: Arc<DashMap<String, u64>>,
dfdaemon_upload_client: DfdaemonUploadClient,
mut shutdown: Shutdown,
mut dfdaemon_shutdown: Shutdown,
) -> Result<()> {
debug!("sync host info from parent {}", parent_host_id);
let response = dfdaemon_upload_client
.sync_host(SyncHostRequest { host_id, peer_id })
.await
.inspect_err(|err| {
error!(
"sync host info from parent {} failed: {}",
parent_host_id, err
);
})?;
let out_stream = response.into_inner();
tokio::pin!(out_stream);
loop {
tokio::select! {
result = out_stream.try_next() => {
match result.inspect_err(|err| {
error!("sync host info from parent {} failed: {}", parent_host_id, err);
})? {
Some(message) => {
let weight = match message.network.as_ref() {
Some(network) => Self::calculate_weight_by_network(network),
None => DEFAULT_NETWORK_WEIGHT,
};
debug!("update host {} weight to {}", parent_host_id, weight);
weights.insert(parent_host_id.clone(), weight);
}
None => break,
}
}
_ = shutdown.recv() => {
debug!("sync host info from parent {} shutting down", parent_host_id);
break;
}
_ = dfdaemon_shutdown.recv() => {
debug!("parent selector shutting down");
break;
}
}
}
Ok(())
}
fn calculate_weight_by_network(network: &Network) -> u64 {
let tx_bw = network.tx_bandwidth();
let max_bw = network.max_tx_bandwidth;
if max_bw == 0 {
return DEFAULT_NETWORK_WEIGHT;
}
max_bw.saturating_sub(tx_bw).max(max_bw / 10)
}
}
pub struct PersistentParentSelector {
config: Arc<Config>,
id_generator: Arc<IDGenerator>,
weights: Arc<DashMap<String, u64>>,
connections: Arc<DashMap<String, Connection>>,
shutdown: shutdown::Shutdown,
_shutdown_complete: mpsc::UnboundedSender<()>,
}
impl PersistentParentSelector {
#[instrument(skip_all)]
pub fn new(
config: Arc<Config>,
id_generator: Arc<IDGenerator>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> PersistentParentSelector {
Self {
config,
id_generator,
weights: Arc::new(DashMap::new()),
connections: Arc::new(DashMap::new()),
shutdown,
_shutdown_complete: shutdown_complete_tx,
}
}
#[instrument(skip_all)]
pub fn select(&self, parents: Vec<CollectedParent>) -> CollectedParent {
let weights: Vec<u64> = parents
.iter()
.map(|parent| {
let Some(parent_host) = parent.host.as_ref() else {
warn!(
"persistent parent {} has no host info, defaulting weight to 0",
parent.id
);
return 0;
};
let parent_host_id = parent_host.id.clone();
self.weights
.get(&parent_host_id)
.map(|w| *w)
.unwrap_or_else(|| {
debug!(
"no weight info for persistent parent {} {}, defaulting weight to 0",
parent.id, parent_host_id
);
0
})
})
.collect();
match WeightedIndex::new(weights) {
Ok(dist) => {
let mut rng = rand::rng();
let index = dist.sample(&mut rng);
let selected_parent = &parents[index];
debug!("selected persistent parent {}", selected_parent.id);
selected_parent.clone()
}
Err(_) => parents[fastrand::usize(..parents.len())].clone(),
}
}
#[instrument(skip_all)]
pub async fn register(&self, parents: &[PersistentPeer]) -> Result<()> {
let dfdaemon_shutdown = self.shutdown.clone();
let mut join_set = JoinSet::new();
for parent in parents {
debug!("register persistent parent {}", parent.id);
let Some(parent_host) = parent.host.as_ref() else {
warn!("persistent parent {} has no host info, skipping", parent.id);
continue;
};
let parent_host_id = parent_host.id.clone();
if let Some(connection) = self.connections.get(&parent_host_id) {
connection.increment_request();
continue;
}
let dfdaemon_upload_client = match DfdaemonUploadClient::new(
self.config.clone(),
format_url(
"http",
IpAddr::from_str(&parent_host.ip)?,
parent_host.port as u16,
),
false,
)
.await
{
Ok(client) => client,
Err(err) => {
error!("failed to connect to parent {}: {}", parent_host_id, err);
continue;
}
};
let weight = match parent_host.network.as_ref() {
Some(network) => Self::calculate_weight_by_network(network),
None => DEFAULT_NETWORK_WEIGHT,
};
self.weights.entry(parent_host_id.clone()).or_insert(weight);
match self.connections.entry(parent_host_id.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
entry.get().increment_request();
continue;
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
let connection = Connection::new();
connection.increment_request();
let shutdown = connection.shutdown.clone();
entry.insert(connection);
let weights = self.weights.clone();
let host_id = self.id_generator.host_id();
let peer_id = self.id_generator.peer_id();
let dfdaemon_shutdown_clone = dfdaemon_shutdown.clone();
join_set.spawn(
Self::sync_host(
host_id,
peer_id,
parent_host_id.clone(),
weights,
dfdaemon_upload_client,
shutdown,
dfdaemon_shutdown_clone,
)
.in_current_span(),
);
}
}
}
tokio::spawn(async move {
while let Some(message) = join_set.join_next().await {
match message {
Ok(Ok(_)) => debug!("sync host info completed"),
Ok(Err(err)) => error!("sync host info failed: {}", err),
Err(err) => error!("task join error: {}", err),
}
}
});
Ok(())
}
#[instrument(skip_all)]
pub fn unregister(&self, parents: &[PersistentPeer]) {
for parent in parents {
debug!("unregister persistent parent {}", parent.id);
let Some(parent_host) = parent.host.as_ref() else {
warn!("persistent parent {} has no host info, skipping", parent.id);
continue;
};
let parent_host_id = parent_host.id.clone();
if let Some(connection) = self.connections.get(&parent_host_id) {
connection.decrement_request();
if connection.active_requests() == 0 {
debug!("cleaning up parent {} connection", parent_host_id);
connection.shutdown();
drop(connection);
self.weights.remove(&parent_host_id);
self.connections.remove(&parent_host_id);
}
}
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn sync_host(
host_id: String,
peer_id: String,
parent_host_id: String,
weights: Arc<DashMap<String, u64>>,
dfdaemon_upload_client: DfdaemonUploadClient,
mut shutdown: Shutdown,
mut dfdaemon_shutdown: Shutdown,
) -> Result<()> {
debug!("sync host info from persistent parent {}", parent_host_id);
let response = dfdaemon_upload_client
.sync_host(SyncHostRequest { host_id, peer_id })
.await
.inspect_err(|err| {
error!(
"sync host info from persistent parent {} failed: {}",
parent_host_id, err
);
})?;
let out_stream = response.into_inner();
tokio::pin!(out_stream);
loop {
tokio::select! {
result = out_stream.try_next() => {
match result.inspect_err(|err| {
error!("sync host info from persistent parent {} failed: {}", parent_host_id, err);
})? {
Some(message) => {
let weight = match message.network.as_ref() {
Some(network) => Self::calculate_weight_by_network(network),
None => DEFAULT_NETWORK_WEIGHT,
};
debug!("update host {} weight to {}", parent_host_id, weight);
weights.insert(parent_host_id.clone(), weight);
}
None => break,
}
}
_ = shutdown.recv() => {
debug!("sync host info from persistent parent {} shutting down", parent_host_id);
break;
}
_ = dfdaemon_shutdown.recv() => {
debug!("persistent parent selector shutting down");
break;
}
}
}
Ok(())
}
fn calculate_weight_by_network(network: &Network) -> u64 {
let tx_bw = network.tx_bandwidth();
let max_bw = network.max_tx_bandwidth;
if max_bw == 0 {
return DEFAULT_NETWORK_WEIGHT;
}
max_bw.saturating_sub(tx_bw).max(max_bw / 10)
}
}
pub struct PersistentCacheParentSelector {
config: Arc<Config>,
id_generator: Arc<IDGenerator>,
weights: Arc<DashMap<String, u64>>,
connections: Arc<DashMap<String, Connection>>,
shutdown: shutdown::Shutdown,
_shutdown_complete: mpsc::UnboundedSender<()>,
}
impl PersistentCacheParentSelector {
#[instrument(skip_all)]
pub fn new(
config: Arc<Config>,
id_generator: Arc<IDGenerator>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> PersistentCacheParentSelector {
Self {
config,
id_generator,
weights: Arc::new(DashMap::new()),
connections: Arc::new(DashMap::new()),
shutdown,
_shutdown_complete: shutdown_complete_tx,
}
}
#[instrument(skip_all)]
pub fn select(&self, parents: Vec<CollectedParent>) -> CollectedParent {
let weights: Vec<u64> = parents
.iter()
.map(|parent| {
let Some(parent_host) = parent.host.as_ref() else {
warn!(
"persistent cache parent {} has no host info, defaulting weight to 0",
parent.id
);
return 0;
};
let parent_host_id = parent_host.id.clone();
self.weights
.get(&parent_host_id)
.map(|w| *w)
.unwrap_or_else(|| {
debug!(
"no weight info for persistent cache parent {} {}, defaulting weight to 0",
parent.id, parent_host_id
);
0
})
})
.collect();
match WeightedIndex::new(weights) {
Ok(dist) => {
let mut rng = rand::rng();
let index = dist.sample(&mut rng);
let selected_parent = &parents[index];
debug!("selected persistent cache parent {}", selected_parent.id);
selected_parent.clone()
}
Err(_) => parents[fastrand::usize(..parents.len())].clone(),
}
}
#[instrument(skip_all)]
pub async fn register(&self, parents: &[PersistentCachePeer]) -> Result<()> {
let dfdaemon_shutdown = self.shutdown.clone();
let mut join_set = JoinSet::new();
for parent in parents {
debug!("register persistent cache parent {}", parent.id);
let Some(parent_host) = parent.host.as_ref() else {
warn!(
"persistent cache parent {} has no host info, skipping",
parent.id
);
continue;
};
let parent_host_id = parent_host.id.clone();
if let Some(connection) = self.connections.get(&parent_host_id) {
connection.increment_request();
continue;
}
let dfdaemon_upload_client = match DfdaemonUploadClient::new(
self.config.clone(),
format_url(
"http",
IpAddr::from_str(&parent_host.ip)?,
parent_host.port as u16,
),
false,
)
.await
{
Ok(client) => client,
Err(err) => {
error!("failed to connect to parent {}: {}", parent_host_id, err);
continue;
}
};
let weight = match parent_host.network.as_ref() {
Some(network) => Self::calculate_weight_by_network(network),
None => DEFAULT_NETWORK_WEIGHT,
};
self.weights.entry(parent_host_id.clone()).or_insert(weight);
match self.connections.entry(parent_host_id.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
entry.get().increment_request();
continue;
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
let connection = Connection::new();
connection.increment_request();
let shutdown = connection.shutdown.clone();
entry.insert(connection);
let weights = self.weights.clone();
let host_id = self.id_generator.host_id();
let peer_id = self.id_generator.peer_id();
let dfdaemon_shutdown_clone = dfdaemon_shutdown.clone();
join_set.spawn(
Self::sync_host(
host_id,
peer_id,
parent_host_id.clone(),
weights,
dfdaemon_upload_client,
shutdown,
dfdaemon_shutdown_clone,
)
.in_current_span(),
);
}
}
}
tokio::spawn(async move {
while let Some(message) = join_set.join_next().await {
match message {
Ok(Ok(_)) => debug!("sync host info completed"),
Ok(Err(err)) => error!("sync host info failed: {}", err),
Err(err) => error!("task join error: {}", err),
}
}
});
Ok(())
}
#[instrument(skip_all)]
pub fn unregister(&self, parents: &[PersistentCachePeer]) {
for parent in parents {
debug!("unregister persistent cache parent {}", parent.id);
let Some(parent_host) = parent.host.as_ref() else {
warn!(
"persistent cache parent {} has no host info, skipping",
parent.id
);
continue;
};
let parent_host_id = parent_host.id.clone();
if let Some(connection) = self.connections.get(&parent_host_id) {
connection.decrement_request();
if connection.active_requests() == 0 {
debug!("cleaning up parent {} connection", parent_host_id);
connection.shutdown();
drop(connection);
self.weights.remove(&parent_host_id);
self.connections.remove(&parent_host_id);
}
}
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn sync_host(
host_id: String,
peer_id: String,
parent_host_id: String,
weights: Arc<DashMap<String, u64>>,
dfdaemon_upload_client: DfdaemonUploadClient,
mut shutdown: Shutdown,
mut dfdaemon_shutdown: Shutdown,
) -> Result<()> {
debug!(
"sync host info from persistent cache parent {}",
parent_host_id
);
let response = dfdaemon_upload_client
.sync_host(SyncHostRequest { host_id, peer_id })
.await
.inspect_err(|err| {
error!(
"sync host info from persistent cache parent {} failed: {}",
parent_host_id, err
);
})?;
let out_stream = response.into_inner();
tokio::pin!(out_stream);
loop {
tokio::select! {
result = out_stream.try_next() => {
match result.inspect_err(|err| {
error!("sync host info from persistent cache parent {} failed: {}", parent_host_id, err);
})? {
Some(message) => {
let weight = match message.network.as_ref() {
Some(network) => Self::calculate_weight_by_network(network),
None => DEFAULT_NETWORK_WEIGHT,
};
debug!("update host {} weight to {}", parent_host_id, weight);
weights.insert(parent_host_id.clone(), weight);
}
None => break,
}
}
_ = shutdown.recv() => {
debug!("sync host info from persistent cache parent {} shutting down", parent_host_id);
break;
}
_ = dfdaemon_shutdown.recv() => {
debug!("persistent cache parent selector shutting down");
break;
}
}
}
Ok(())
}
fn calculate_weight_by_network(network: &Network) -> u64 {
let tx_bw = network.tx_bandwidth();
let max_bw = network.max_tx_bandwidth;
if max_bw == 0 {
return DEFAULT_NETWORK_WEIGHT;
}
max_bw.saturating_sub(tx_bw).max(max_bw / 10)
}
}