#[cfg(all(target_os = "linux", target_env = "gnu"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
use anyhow::Context;
use clap::{Parser, Subcommand};
use freenet::{
config::{Config, ConfigArgs, GlobalExecutor},
local_node::{Executor, NodeConfig, OperationMode},
run_local_node, run_network_node,
server::serve_client_api,
};
use std::sync::Arc;
mod commands;
use commands::{service::ServiceCommand, uninstall::UninstallCommand, update::UpdateCommand};
#[derive(Parser, Debug)]
#[command(name = "freenet")]
#[command(about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Option<Command>,
#[command(flatten)]
config: ConfigArgs,
}
#[derive(Subcommand, Debug)]
enum Command {
Network {
#[command(flatten)]
config: ConfigArgs,
},
Local {
#[command(flatten)]
config: ConfigArgs,
},
#[command(subcommand)]
Service(ServiceCommand),
Update(UpdateCommand),
Uninstall(UninstallCommand),
}
mod build_info {
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub const GIT_COMMIT: &str = env!("GIT_COMMIT_HASH");
pub const GIT_DIRTY: &str = env!("GIT_DIRTY");
pub const BUILD_TIMESTAMP: &str = env!("BUILD_TIMESTAMP");
}
async fn run(config: Config) -> anyhow::Result<()> {
tracing::info!(
version = build_info::VERSION,
git_commit = %format!("{}{}", build_info::GIT_COMMIT, build_info::GIT_DIRTY),
build_timestamp = build_info::BUILD_TIMESTAMP,
"Freenet node starting"
);
match config.mode {
OperationMode::Local => run_local(config).await,
OperationMode::Network => run_network(config).await,
}
}
async fn run_local(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in local mode");
let socket = config.ws_api.clone();
let executor = Executor::from_config_local(Arc::new(config))
.await
.map_err(anyhow::Error::msg)?;
run_local_node(executor, socket)
.await
.map_err(anyhow::Error::msg)
}
async fn run_network(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in network mode");
check_for_existing_process(&config)?;
let clients = serve_client_api(config.ws_api.clone())
.await
.with_context(|| "failed to start HTTP/WebSocket client API")?;
tracing::info!("Initializing node configuration");
let node_config = NodeConfig::new(config)
.await
.with_context(|| "failed while loading node config")?;
let node = node_config
.build(clients)
.await
.with_context(|| "failed while building the node")?;
let shutdown_handle = node.shutdown_handle();
run_network_node_with_signals(node, shutdown_handle).await
}
async fn run_network_node_with_signals(
node: freenet::Node,
shutdown_handle: freenet::ShutdownHandle,
) -> anyhow::Result<()> {
use commands::auto_update::{
UpdateCheckResult, UpdateNeededError, check_if_update_available, clear_version_mismatch,
get_open_connection_count, has_reached_max_backoff, has_version_mismatch, reset_backoff,
version_mismatch_generation,
};
use freenet::transport::{clear_urgent_update, get_highest_seen_version, is_urgent_update};
use tokio::signal;
#[cfg(unix)]
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
.context("failed to install SIGTERM handler")?;
let signal_task = {
let shutdown_handle = shutdown_handle.clone();
GlobalExecutor::spawn(async move {
#[cfg(unix)]
let shutdown_reason = tokio::select! {
_ = signal::ctrl_c() => "received SIGINT (Ctrl+C)",
_ = sigterm.recv() => "received SIGTERM",
};
#[cfg(not(unix))]
let shutdown_reason = {
let _ = signal::ctrl_c().await;
"received SIGINT (Ctrl+C)"
};
tracing::info!(reason = shutdown_reason, "Initiating graceful shutdown");
shutdown_handle.shutdown().await;
})
};
#[cfg(all(unix, feature = "jemalloc-prof"))]
let heap_dump_task = {
let mut sigusr1 = signal::unix::signal(signal::unix::SignalKind::user_defined1())
.context("failed to install SIGUSR1 handler")?;
GlobalExecutor::spawn(async move {
loop {
sigusr1.recv().await;
let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
let path = format!("/tmp/freenet-heap.{timestamp}.heap");
tracing::info!(%path, "SIGUSR1 received, dumping heap profile");
match std::ffi::CString::new(path.as_str()) {
Ok(c_path) => {
let ptr: *const libc::c_char = c_path.as_ptr();
let result = unsafe { tikv_jemalloc_ctl::raw::write(b"prof.dump\0", ptr) };
match result {
Ok(()) => tracing::info!(%path, "Heap profile dumped"),
Err(e) => tracing::error!(error = %e, "Failed to dump heap profile"),
}
}
Err(e) => tracing::error!(error = %e, "Invalid heap dump path"),
}
}
})
};
let (update_tx, mut update_rx) = tokio::sync::oneshot::channel::<String>();
let auto_update_disabled = !build_info::GIT_DIRTY.is_empty();
let update_check_task = GlobalExecutor::spawn(async move {
use std::time::Instant;
if auto_update_disabled {
tracing::info!(
git_dirty = build_info::GIT_DIRTY,
"Auto-update disabled: this is a dirty (locally modified) build. \
Run `freenet update` manually if needed."
);
std::future::pending::<()>().await;
return;
}
fn parse_our_version() -> Option<(u8, u8, u16)> {
let parts: Vec<&str> = build_info::VERSION.split('.').collect();
if parts.len() < 3 {
return None;
}
let major: u8 = parts[0].parse().ok()?;
let minor: u8 = parts[1].parse().ok()?;
let patch_str = parts[2].split('-').next()?;
let patch: u16 = patch_str.parse().ok()?;
Some((major, minor, patch))
}
const HARD_EXIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(6 * 3600);
const MAX_STAGGER_SECS: u64 = 4 * 3600;
const STAGGER_COOLDOWN: std::time::Duration = std::time::Duration::from_secs(24 * 3600);
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
let mut last_mismatch_generation = version_mismatch_generation();
let mut isolated_mismatch_since: Option<Instant> = None;
let mut stagger_deadline: Option<Instant> = None;
let mut stagger_cooldown_until: Option<Instant> = None;
let our_version = parse_our_version();
loop {
interval.tick().await;
if is_urgent_update() {
tracing::warn!("Urgent update needed (remote's min_compatible > our version)");
match check_if_update_available(build_info::VERSION).await {
UpdateCheckResult::UpdateAvailable(new_version) => {
clear_version_mismatch();
clear_urgent_update();
tracing::info!(
new_version = %new_version,
"Urgent update confirmed on GitHub, triggering immediate auto-update"
);
#[allow(clippy::let_underscore_must_use)]
let _ = update_tx.send(new_version);
return;
}
UpdateCheckResult::Skipped => {
}
}
}
if let (Some(our_ver), Some(highest)) = (our_version, get_highest_seen_version()) {
if highest > our_ver {
let in_cooldown =
stagger_cooldown_until.is_some_and(|until| Instant::now() < until);
if stagger_deadline.is_none() && !in_cooldown {
let stagger_secs =
freenet::config::GlobalRng::random_u64() % MAX_STAGGER_SECS;
let deadline =
Instant::now() + std::time::Duration::from_secs(stagger_secs);
tracing::info!(
highest_seen = %format!("{}.{}.{}", highest.0, highest.1, highest.2),
our_version = build_info::VERSION,
stagger_secs,
"Newer version discovered via peer handshake, stagger timer started"
);
stagger_deadline = Some(deadline);
}
if let Some(deadline) = stagger_deadline {
if Instant::now() >= deadline {
tracing::info!("Stagger timer expired, checking GitHub for updates");
match check_if_update_available(build_info::VERSION).await {
UpdateCheckResult::UpdateAvailable(new_version) => {
clear_version_mismatch();
clear_urgent_update();
tracing::info!(
new_version = %new_version,
"Update confirmed on GitHub after stagger, triggering auto-update"
);
#[allow(clippy::let_underscore_must_use)]
let _ = update_tx.send(new_version);
return;
}
UpdateCheckResult::Skipped => {
stagger_deadline = None;
stagger_cooldown_until =
Some(Instant::now() + STAGGER_COOLDOWN);
}
}
}
}
}
}
let current_generation = version_mismatch_generation();
if current_generation != last_mismatch_generation {
last_mismatch_generation = current_generation;
reset_backoff();
tracing::info!(
generation = current_generation,
"Fresh version mismatch — reset update check backoff"
);
}
if has_version_mismatch() {
let open_connections = get_open_connection_count();
if open_connections == 0 {
isolated_mismatch_since.get_or_insert_with(Instant::now);
} else {
isolated_mismatch_since = None;
}
if let Some(since) = isolated_mismatch_since {
if since.elapsed() > HARD_EXIT_TIMEOUT {
tracing::error!(
isolated_secs = since.elapsed().as_secs(),
"Isolated with version mismatch >6h — forcing exit for auto-update"
);
clear_version_mismatch();
#[allow(clippy::let_underscore_must_use)]
let _ = update_tx.send("unknown (hard timeout)".to_string());
return;
}
}
tracing::info!("Version mismatch detected, checking GitHub for updates...");
match check_if_update_available(build_info::VERSION).await {
UpdateCheckResult::UpdateAvailable(new_version) => {
clear_version_mismatch();
tracing::info!(
new_version = %new_version,
"Newer version confirmed on GitHub, triggering auto-update"
);
#[allow(clippy::let_underscore_must_use)]
let _ = update_tx.send(new_version);
return;
}
UpdateCheckResult::Skipped if has_reached_max_backoff() => {
let open_connections = get_open_connection_count();
if open_connections == 0 {
tracing::warn!(
"Max backoff + 0 connections — \
trusting gateway version signal, exiting for auto-update"
);
clear_version_mismatch();
#[allow(clippy::let_underscore_must_use)]
let _ = update_tx.send("unknown (gateway mismatch)".to_string());
return;
}
tracing::info!(
open_connections,
"Max backoff reached but node has connections — \
clearing version mismatch flag"
);
clear_version_mismatch();
}
UpdateCheckResult::Skipped => {}
}
} else {
isolated_mismatch_since = None;
}
}
});
let result = tokio::select! {
r = run_network_node(node) => r,
new_version = &mut update_rx => {
match new_version {
Ok(version) => {
tracing::info!(version = %version, "Initiating graceful shutdown for auto-update");
shutdown_handle.shutdown().await;
Err(UpdateNeededError { new_version: version }.into())
}
Err(_) => {
Ok(())
}
}
}
};
signal_task.abort();
update_check_task.abort();
#[cfg(all(unix, feature = "jemalloc-prof"))]
heap_dump_task.abort();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
if result.is_ok() {
tracing::info!("Graceful shutdown complete");
}
result
}
const EXIT_CODE_ALREADY_RUNNING: i32 = 43;
#[derive(Debug, thiserror::Error)]
#[error("another freenet instance is already running")]
struct AlreadyRunningError;
fn check_for_existing_process(config: &Config) -> anyhow::Result<()> {
use std::net::{SocketAddr, TcpStream};
use std::time::Duration;
let addr = SocketAddr::from((config.ws_api.address, config.ws_api.port));
if TcpStream::connect_timeout(&addr, Duration::from_millis(500)).is_ok() {
let pid = find_process_on_port(config.ws_api.port);
if let Some(pid) = pid {
tracing::warn!(
port = config.ws_api.port,
pid = pid,
"Another process (PID {pid}) is already listening on port {}. \
If freenet is installed as a service, use 'freenet service stop' before \
running manually. Otherwise use 'kill {pid}' to stop it.",
config.ws_api.port
);
} else {
tracing::warn!(
port = config.ws_api.port,
"Port {} is already in use by another process. \
If freenet is installed as a service, use 'freenet service stop' before \
running manually.",
config.ws_api.port
);
}
return Err(AlreadyRunningError.into());
}
Ok(())
}
fn find_process_on_port(port: u16) -> Option<u32> {
#[cfg(target_os = "linux")]
{
let port_hex = format!("{:04X}", port);
let target_inode = find_listening_inode("/proc/net/tcp", &port_hex)
.or_else(|| find_listening_inode("/proc/net/tcp6", &port_hex))?;
let expected_link = format!("socket:[{target_inode}]");
let proc_dir = std::fs::read_dir("/proc").ok()?;
for entry in proc_dir.filter_map(|e| e.ok()) {
let pid_str = entry.file_name();
let pid_str = pid_str.to_string_lossy();
if !pid_str.chars().all(|c| c.is_ascii_digit()) {
continue;
}
let fd_dir = format!("/proc/{pid_str}/fd");
if let Ok(fds) = std::fs::read_dir(&fd_dir) {
for fd in fds.filter_map(|e| e.ok()) {
if let Ok(link) = std::fs::read_link(fd.path()) {
if link.to_string_lossy() == expected_link {
return pid_str.parse().ok();
}
}
}
}
}
None
}
#[cfg(not(target_os = "linux"))]
{
let _ = port;
None
}
}
#[cfg(target_os = "linux")]
fn find_listening_inode(proc_path: &str, port_hex: &str) -> Option<String> {
let contents = std::fs::read_to_string(proc_path).ok()?;
parse_listening_inode(&contents, port_hex)
}
#[cfg(target_os = "linux")]
fn parse_listening_inode(contents: &str, port_hex: &str) -> Option<String> {
for line in contents.lines().skip(1) {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 10 {
continue;
}
if fields[3] == "0A" {
if let Some(addr_port) = fields[1].rsplit_once(':') {
if addr_port.1 == port_hex {
return Some(fields[9].to_string());
}
}
}
}
None
}
fn run_node(config_args: ConfigArgs) -> anyhow::Result<()> {
if config_args.version {
println!(
"Freenet version: {} ({}{})",
config_args.current_version(),
build_info::GIT_COMMIT,
build_info::GIT_DIRTY
);
println!("Build timestamp: {}", build_info::BUILD_TIMESTAMP);
return Ok(());
}
let max_blocking_threads = config_args.max_blocking_threads.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| (n.get() * 2).clamp(4, 32))
.unwrap_or(8)
});
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(
std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1),
)
.max_blocking_threads(max_blocking_threads)
.thread_name("freenet-main")
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
let config = config_args.build().await?;
freenet::config::set_logger(None, None, config.paths().log_dir());
tracing::info!(
max_blocking_threads,
"Tokio runtime configured with bounded blocking thread pool"
);
run(config).await
})?;
Ok(())
}
fn freenet_main() -> anyhow::Result<()> {
let cli = Cli::parse();
match cli.command {
Some(Command::Service(cmd)) => {
let rt = tokio::runtime::Runtime::new()?;
let config = rt.block_on(cli.config.build())?;
cmd.run(
build_info::VERSION,
build_info::GIT_COMMIT,
build_info::GIT_DIRTY,
build_info::BUILD_TIMESTAMP,
config.paths(),
)
}
Some(Command::Update(cmd)) => cmd.run(build_info::VERSION),
Some(Command::Uninstall(cmd)) => cmd.run(),
Some(Command::Network { mut config }) => {
config.mode = Some(OperationMode::Network);
run_node(config)
}
Some(Command::Local { mut config }) => {
config.mode = Some(OperationMode::Local);
run_node(config)
}
None => {
if cli.config.version {
return run_node(cli.config);
}
if commands::setup_wizard::maybe_show_setup_wizard()? {
return Ok(());
}
run_node(cli.config)
}
}
}
fn main() {
use commands::auto_update::{EXIT_CODE_UPDATE_NEEDED, UpdateNeededError};
match freenet_main() {
Ok(()) => std::process::exit(0),
Err(e) => {
if e.downcast_ref::<UpdateNeededError>().is_some() {
eprintln!("Update needed, exiting for service wrapper to handle update...");
std::process::exit(EXIT_CODE_UPDATE_NEEDED);
}
if e.downcast_ref::<AlreadyRunningError>().is_some() {
eprintln!(
"Another freenet instance is already running. \
Exiting without error to avoid restart loop."
);
std::process::exit(EXIT_CODE_ALREADY_RUNNING);
}
eprintln!("Error: {e:?}");
std::process::exit(1);
}
}
}
#[cfg(test)]
mod tests {
#[cfg(target_os = "linux")]
use super::parse_listening_inode;
#[test]
#[cfg(target_os = "linux")]
fn test_parse_listening_inode_ipv4() {
let content = "\
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
0: 00000000:1D55 00000000:0000 0A 00000000:00000000 00:00000000 00000000 1000 0 54321 1 0000000000000000 100 0 0 10 0
1: 0100007F:0035 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 11111 1 0000000000000000 100 0 0 10 0
2: 00000000:1D55 0100007F:E234 01 00000000:00000000 00:00000000 00000000 1000 0 99999 1 0000000000000000 100 0 0 10 0";
assert_eq!(
parse_listening_inode(content, "1D55"),
Some("54321".to_string())
);
assert_eq!(
parse_listening_inode(content, "0035"),
Some("11111".to_string())
);
assert_eq!(parse_listening_inode(content, "1F90"), None);
}
#[test]
#[cfg(target_os = "linux")]
fn test_parse_listening_inode_ipv6() {
let content = "\
sl local_address remote_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
0: 00000000000000000000000000000000:1D55 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 1000 0 67890 1 0000000000000000 100 0 0 10 0";
assert_eq!(
parse_listening_inode(content, "1D55"),
Some("67890".to_string())
);
}
#[test]
#[cfg(target_os = "linux")]
fn test_parse_listening_inode_short_line() {
let content = "\
sl local_address rem_address st
0: 00000000:1D55 00000000:0000 0A";
assert_eq!(parse_listening_inode(content, "1D55"), None);
}
}