use clap::Parser;
use dragonfly_client::announcer::SchedulerAnnouncer;
use dragonfly_client::dynconfig::Dynconfig;
use dragonfly_client::gc::GC;
use dragonfly_client::grpc::{
dfdaemon_download::DfdaemonDownloadServer, dfdaemon_upload::DfdaemonUploadServer,
manager::ManagerClient, scheduler::SchedulerClient,
};
use dragonfly_client::health::Health;
use dragonfly_client::proxy::Proxy;
use dragonfly_client::resource::{
persistent_cache_task::PersistentCacheTask, persistent_task::PersistentTask, task::Task,
};
use dragonfly_client::stats::Stats;
use dragonfly_client::tracing::init_tracing;
use dragonfly_client_backend::BackendFactory;
use dragonfly_client_config::{dfdaemon, VersionValueParser};
use dragonfly_client_metric::Metrics;
use dragonfly_client_storage::{server::quic::QUICServer, server::tcp::TCPServer, Storage};
use dragonfly_client_util::{
container::is_running_in_container, id_generator::IDGenerator, ratelimiter::bbr::BBR, shutdown,
sysinfo::SystemMonitor,
};
use leaky_bucket::RateLimiter;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use termion::{color, style};
use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tracing::{error, info, Level};
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
#[derive(Debug, Parser)]
#[command(
name = dfdaemon::NAME,
author,
version,
about = "dfdaemon is a high performance P2P download daemon",
long_about = "A high performance P2P download daemon in Dragonfly that can download resources of different protocols. \
When user triggers a file downloading task, dfdaemon will download the pieces of file from other peers. \
Meanwhile, it will act as an uploader to support other peers to download pieces from it if it owns them.",
disable_version_flag = true
)]
struct Args {
#[arg(
short = 'c',
long = "config",
default_value_os_t = dfdaemon::default_dfdaemon_config_path(),
env = "DFDAEMON_CONFIG",
help = "Specify config file to use")
]
config: PathBuf,
#[arg(
short = 'l',
long,
default_value = "info",
env = "DFDAEMON_LOG_LEVEL",
help = "Specify the logging level [trace, debug, info, warn, error]"
)]
log_level: Level,
#[arg(
long,
default_value_os_t = dfdaemon::default_dfdaemon_log_dir(),
env = "DFDAEMON_LOG_DIR",
help = "Specify the log directory"
)]
log_dir: PathBuf,
#[arg(
long,
default_value_t = 6,
env = "DFDAEMON_LOG_MAX_FILES",
help = "Specify the max number of log files"
)]
log_max_files: usize,
#[arg(
long,
default_value_t = true,
env = "DFDAEMON_CONSOLE",
help = "Specify whether to print log"
)]
console: bool,
#[arg(
short = 'V',
long = "version",
help = "Print version information",
default_value_t = false,
action = clap::ArgAction::SetTrue,
value_parser = VersionValueParser
)]
version: bool,
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let args = Args::parse();
let config = match dfdaemon::Config::load(&args.config).await {
Ok(config) => config,
Err(err) => {
println!(
"{}{}Load config {} error: {}{}\n",
color::Fg(color::Red),
style::Bold,
args.config.display(),
err,
style::Reset
);
println!(
"{}{}If the file does not exist, you need to new a default config file refer to: {}{}{}{}https://d7y.io/docs/next/reference/configuration/client/dfdaemon/{}",
color::Fg(color::Yellow),
style::Bold,
style::Reset,
color::Fg(color::Cyan),
style::Underline,
style::Italic,
style::Reset,
);
std::process::exit(1);
}
};
let config = Arc::new(config);
let _guards = init_tracing(
dfdaemon::NAME,
args.log_dir.clone(),
args.log_level,
args.log_max_files,
config.tracing.protocol.clone(),
config.tracing.endpoint.clone(),
config.tracing.path.clone(),
Some(config.tracing.headers.clone()),
Some(config.host.clone()),
config.seed_peer.enable,
args.console,
);
let storage = Storage::new(config.clone(), config.storage.dir.as_path(), args.log_dir)
.await
.inspect_err(|err| {
error!("initialize storage failed: {}", err);
})?;
let storage = Arc::new(storage);
let id_generator = IDGenerator::new(
config.host.ip.unwrap().to_string(),
config.host.hostname.clone(),
config.seed_peer.enable,
);
let id_generator = Arc::new(id_generator);
let manager_client = ManagerClient::new(config.clone(), config.manager.addr.clone())
.await
.inspect_err(|err| {
error!("initialize manager client failed: {}", err);
})?;
let manager_client = Arc::new(manager_client);
let shutdown = shutdown::Shutdown::default();
let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::unbounded_channel();
let dynconfig = Dynconfig::new(
config.clone(),
manager_client.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
)
.await
.inspect_err(|err| {
error!("initialize dynconfig server failed: {}", err);
})?;
let dynconfig = Arc::new(dynconfig);
let scheduler_client = SchedulerClient::new(config.clone(), dynconfig.clone())
.await
.inspect_err(|err| {
error!("initialize scheduler client failed: {}", err);
})?;
let scheduler_client = Arc::new(scheduler_client);
let backend_factory =
BackendFactory::new(config.clone(), Some(config.server.plugin_dir.as_path())).inspect_err(
|err| {
error!("initialize backend factory failed: {}", err);
},
)?;
let backend_factory = Arc::new(backend_factory);
let download_bandwidth_limiter = Arc::new(
RateLimiter::builder()
.initial(config.download.bandwidth_limit.as_u64() as usize)
.refill(config.download.bandwidth_limit.as_u64() as usize)
.max(config.download.bandwidth_limit.as_u64() as usize)
.interval(Duration::from_secs(1))
.fair(false)
.build(),
);
let upload_bandwidth_limiter = Arc::new(
RateLimiter::builder()
.initial(config.upload.bandwidth_limit.as_u64() as usize)
.refill(config.upload.bandwidth_limit.as_u64() as usize)
.max(config.upload.bandwidth_limit.as_u64() as usize)
.interval(Duration::from_secs(1))
.fair(false)
.build(),
);
let prefetch_bandwidth_limiter = Arc::new(
RateLimiter::builder()
.initial(config.proxy.prefetch_bandwidth_limit.as_u64() as usize)
.refill(config.proxy.prefetch_bandwidth_limit.as_u64() as usize)
.max(config.proxy.prefetch_bandwidth_limit.as_u64() as usize)
.interval(Duration::from_secs(1))
.fair(false)
.build(),
);
let back_to_source_bandwidth_limiter = Arc::new(
RateLimiter::builder()
.initial(config.download.back_to_source_bandwidth_limit.as_u64() as usize)
.refill(config.download.back_to_source_bandwidth_limit.as_u64() as usize)
.max(config.download.back_to_source_bandwidth_limit.as_u64() as usize)
.interval(Duration::from_secs(1))
.fair(false)
.build(),
);
let task = Task::new(
config.clone(),
id_generator.clone(),
storage.clone(),
scheduler_client.clone(),
backend_factory.clone(),
download_bandwidth_limiter.clone(),
prefetch_bandwidth_limiter.clone(),
back_to_source_bandwidth_limiter.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
)?;
let task = Arc::new(task);
let persistent_task = PersistentTask::new(
config.clone(),
id_generator.clone(),
storage.clone(),
scheduler_client.clone(),
backend_factory.clone(),
download_bandwidth_limiter.clone(),
prefetch_bandwidth_limiter.clone(),
back_to_source_bandwidth_limiter.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
)?;
let persistent_task = Arc::new(persistent_task);
let persistent_cache_task = PersistentCacheTask::new(
config.clone(),
id_generator.clone(),
storage.clone(),
scheduler_client.clone(),
backend_factory.clone(),
download_bandwidth_limiter.clone(),
prefetch_bandwidth_limiter.clone(),
back_to_source_bandwidth_limiter.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
)?;
let persistent_cache_task = Arc::new(persistent_cache_task);
let health = Health::new(
SocketAddr::new(config.health.server.ip.unwrap(), config.health.server.port),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let metrics = Metrics::new(
config.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let stats = Stats::new(
SocketAddr::new(config.stats.server.ip.unwrap(), config.stats.server.port),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let mut storage_tcp_server = TCPServer::new(
config.clone(),
SocketAddr::new(
config.storage.server.ip.unwrap(),
config.storage.server.tcp_port,
),
id_generator.clone(),
storage.clone(),
upload_bandwidth_limiter.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let mut storage_quic_server = QUICServer::new(
SocketAddr::new(
config.storage.server.ip.unwrap(),
config.storage.server.quic_port,
),
id_generator.clone(),
storage.clone(),
upload_bandwidth_limiter.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let proxy = Proxy::new(
config.clone(),
task.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let system_monitor = SystemMonitor::new(config.host.ip.unwrap(), config.upload.bandwidth_limit);
let system_monitor = Arc::new(system_monitor);
let scheduler_announcer = SchedulerAnnouncer::new(
config.clone(),
id_generator.host_id(),
scheduler_client.clone(),
system_monitor.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
)
.await
.inspect_err(|err| {
error!("initialize scheduler announcer failed: {}", err);
})?;
let bbr = if let Some(ref adaptive_rate_limit) = config.server.adaptive_rate_limit {
Some(Arc::new(BBR::new(adaptive_rate_limit.clone()).await))
} else {
None
};
let mut dfdaemon_upload_grpc = DfdaemonUploadServer::new(
config.clone(),
SocketAddr::new(config.upload.server.ip.unwrap(), config.upload.server.port),
task.clone(),
persistent_task.clone(),
persistent_cache_task.clone(),
system_monitor.clone(),
bbr.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let mut dfdaemon_download_grpc = DfdaemonDownloadServer::new(
config.clone(),
dynconfig.clone(),
config.download.server.socket_path.clone(),
task.clone(),
persistent_task.clone(),
persistent_cache_task.clone(),
bbr.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
let gc = GC::new(
config.clone(),
id_generator.host_id(),
storage.clone(),
scheduler_client.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
info!(
"dfdaemon started at pid {}, containerized: {}",
std::process::id(),
is_running_in_container()
);
let grpc_server_started_barrier = Arc::new(Barrier::new(3));
tokio::select! {
_ = tokio::spawn(async move { dynconfig.run().await }) => {
info!("dynconfig manager exited");
},
_ = tokio::spawn(async move { health.run().await }) => {
info!("health server exited");
},
_ = tokio::spawn(async move { metrics.run().await }) => {
info!("metrics server exited");
},
_ = tokio::spawn(async move { stats.run().await }) => {
info!("stats server exited");
},
_ = tokio::spawn(async move { scheduler_announcer.run().await }) => {
info!("announcer scheduler exited");
},
_ = tokio::spawn(async move { gc.run().await }) => {
info!("garbage collector exited");
},
_ = {
tokio::spawn(async move {
storage_tcp_server.run().await.unwrap_or_else(|err| error!("storage tcp server failed: {}", err));
})
} => {
info!("storage tcp server exited");
},
_ = {
tokio::spawn(async move {
storage_quic_server.run().await.unwrap_or_else(|err| error!("storage quic server failed: {}", err));
})
} => {
info!("storage quic server exited");
},
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_upload_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon upload grpc server failed: {}", err));
})
} => {
info!("dfdaemon upload grpc server exited");
},
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_download_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon download grpc server failed: {}", err));
})
} => {
info!("dfdaemon download grpc unix server exited");
},
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
proxy.run(barrier).await.unwrap_or_else(|err| error!("proxy server failed: {}", err));
})
} => {
info!("proxy server exited");
},
_ = shutdown::shutdown_signal() => {},
}
shutdown.trigger();
drop(task);
drop(persistent_task);
drop(persistent_cache_task);
drop(scheduler_client);
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
Ok(())
}