use rayfish::{
DNS_DOMAIN, apply, config, daemon, firewall, identity, invite, ipc, layout, logdir, membership,
onepassword, picker, progress, shutdown, stats, style,
};
use std::sync::{Arc, atomic};
use anyhow::{Context, Result};
use clap::{CommandFactory, Parser, Subcommand};
use membership::GroupMode;
mod cli;
use cli::*;
const FULL_VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), " (", env!("RAY_GIT_SHA"), ")");
#[derive(Parser)]
#[command(
name = "ray",
about = "P2P mesh VPN powered by iroh",
version = FULL_VERSION
)]
struct Cli {
#[arg(long, global = true)]
json: bool,
#[command(subcommand)]
command: Command,
}
static JSON_FLAG: atomic::AtomicBool = atomic::AtomicBool::new(false);
fn json_enabled() -> bool {
JSON_FLAG.load(atomic::Ordering::Relaxed)
}
#[derive(Subcommand)]
pub(crate) enum Command {
#[command(visible_alias = "new")]
Create {
#[arg(long, conflicts_with = "closed")]
open: bool,
#[arg(long)]
closed: bool,
#[arg(long)]
name: Option<String>,
#[arg(long)]
hostname: Option<String>,
#[arg(long)]
tor: bool,
},
Join {
network_key: String,
#[arg(long)]
name: Option<String>,
#[arg(long)]
hostname: Option<String>,
#[arg(long)]
tor: bool,
#[arg(long)]
auto_accept_firewall: bool,
},
#[command(visible_alias = "rm")]
Leave {
name: String,
},
Nuke {
name: String,
#[arg(long)]
force: bool,
},
#[command(visible_aliases = ["st", "ls"])]
Status,
Report,
#[command(hide = true)]
Daemon,
Up {
#[arg(long)]
hostname: Option<String>,
},
Down,
Stop,
Start,
Uninstall,
Install,
Restart,
Completions {
shell: clap_complete::Shell,
},
Invite {
network: String,
#[command(subcommand)]
action: Option<InviteAction>,
},
Requests {
network: String,
},
Accept {
network: String,
id: String,
},
Deny {
network: String,
id: String,
},
Connect {
contact_id: String,
#[arg(long)]
hostname: Option<String>,
},
Connections {
#[command(subcommand)]
action: Option<ConnectionsAction>,
},
Contact {
#[command(subcommand)]
action: Option<ContactAction>,
},
Ping {
peer: String,
#[arg(short, long, default_value_t = 3)]
count: u32,
#[arg(short, long, default_value_t = 1000)]
interval: u64,
},
Netcheck,
Admin {
network: String,
#[command(subcommand)]
action: AdminAction,
},
Firewall {
#[command(subcommand)]
action: FirewallAction,
},
Apply {
spec: Option<String>,
#[arg(long)]
prune: bool,
#[arg(long)]
dry_run: bool,
#[arg(long)]
invite_missing: bool,
#[arg(long, conflicts_with_all = ["spec", "prune", "dry_run", "invite_missing"])]
example: bool,
},
Hostname {
network: String,
name: String,
},
#[command(visible_alias = "whois")]
Identityof {
network: String,
hostname: String,
},
Mdns {
state: String,
},
Config {
#[command(subcommand)]
action: Option<ConfigAction>,
},
SetOperator {
user: String,
},
Send {
file: String,
peer: String,
},
Files {
#[command(subcommand)]
action: Option<FilesAction>,
},
Pair {
#[command(subcommand)]
action: Option<PairAction>,
ticket: Option<String>,
},
#[command(visible_alias = "ver")]
Version,
#[command(visible_alias = "upgrade")]
Update {
#[arg(long)]
force: bool,
#[arg(long)]
check: bool,
#[arg(long, conflicts_with_all = ["list", "version"])]
nightly: bool,
#[arg(long, conflicts_with_all = ["check", "force", "version"])]
list: bool,
#[arg(long, value_name = "VERSION")]
version: Option<String>,
},
}
#[derive(Subcommand)]
pub(crate) enum InviteAction {
Create {
#[arg(long)]
expires: Option<String>,
#[arg(long, conflicts_with = "reusable")]
hostname: Option<String>,
#[arg(long)]
reusable: bool,
#[arg(long)]
qr: bool,
},
#[command(visible_alias = "ls")]
List,
#[command(visible_alias = "rm")]
Revoke {
id: String,
},
}
#[derive(Subcommand)]
pub(crate) enum PairAction {
Accept {
ticket: String,
},
Backup {
#[arg(long = "1password", alias = "op")]
onepassword: bool,
#[arg(long)]
vault: Option<String>,
#[arg(long, default_value = "Rayfish Identity")]
item: String,
},
Restore {
backup: Option<String>,
#[arg(long = "1password", alias = "op")]
onepassword: bool,
#[arg(long)]
vault: Option<String>,
#[arg(long, default_value = "Rayfish Identity")]
item: String,
},
}
#[derive(Subcommand)]
pub(crate) enum AdminAction {
Add {
identity: String,
},
#[command(visible_alias = "ls")]
List,
}
#[derive(Subcommand)]
pub(crate) enum ConnectionsAction {
#[command(visible_alias = "ls")]
List,
#[command(visible_alias = "ok")]
Approve {
id: String,
},
}
#[derive(Subcommand)]
pub(crate) enum ConfigAction {
#[command(visible_alias = "ls")]
Get {
key: Option<String>,
},
Set {
key: String,
value: String,
#[arg(long)]
replace: bool,
},
#[command(visible_alias = "rm")]
Unset {
key: String,
},
}
#[derive(Subcommand)]
pub(crate) enum ContactAction {
Id,
Rotate,
}
#[derive(Subcommand)]
pub(crate) enum FirewallAction {
#[command(visible_alias = "a")]
Add {
direction: String,
action: String,
#[arg(long, short = 'p', default_value = "any")]
proto: String,
#[arg(long, short = 'P')]
port: Option<String>,
#[arg(long)]
peer: Option<String>,
#[arg(long)]
network: Option<String>,
},
#[command(visible_aliases = ["rm", "del"])]
Remove {
index: usize,
},
#[command(visible_aliases = ["ls", "list"])]
Show,
Default {
action: String,
},
Reject {
state: String,
},
Suggest {
network: String,
#[arg(long)]
subject: String,
#[arg(long, value_name = "[PEER:]SPEC")]
allow: Vec<String>,
#[arg(long, value_name = "[PEER:]SPEC")]
deny: Vec<String>,
},
Pending {
network: String,
},
Accept {
network: String,
},
Deny {
network: String,
},
AutoAccept {
network: String,
state: String,
},
Ssh {
#[command(subcommand)]
action: SshAction,
},
}
#[derive(Subcommand)]
pub(crate) enum SshAction {
On,
Off,
#[command(visible_alias = "ok")]
Allow {
network: String,
peer: String,
#[arg(long = "user", short = 'u', value_delimiter = ',')]
user: Vec<String>,
},
#[command(visible_aliases = ["rm", "del"])]
Deny {
network: String,
peer: String,
},
#[command(visible_aliases = ["ls", "list"])]
Show {
network: Option<String>,
},
}
#[derive(Subcommand)]
pub(crate) enum FilesAction {
Accept {
id: u64,
#[arg(long, short)]
output: Option<String>,
},
}
fn check_root() {
if unsafe { libc::geteuid() } != 0 {
eprintln!("rayfish requires root privileges to create TUN devices. Run with sudo.");
std::process::exit(1);
}
}
#[derive(Default)]
struct LogGuard {
_appender: Option<tracing_appender::non_blocking::WorkerGuard>,
#[cfg(feature = "otel")]
otel_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
}
impl Drop for LogGuard {
fn drop(&mut self) {
#[cfg(feature = "otel")]
if let Some(provider) = self.otel_provider.take() {
let _ = provider.shutdown();
}
}
}
fn init_tracing(to_file: bool) -> LogGuard {
use tracing_subscriber::prelude::*;
let global_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info,rayfish=debug"));
let console_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let console_layer = tracing_subscriber::fmt::layer().with_filter(console_filter);
let (file_layer, appender_guard) = if to_file {
match std::fs::create_dir_all(logdir::log_dir()) {
Ok(()) => {
match tracing_appender::rolling::Builder::new()
.rotation(tracing_appender::rolling::Rotation::DAILY)
.filename_prefix("rayfish.log")
.max_log_files(7)
.build(logdir::log_dir())
{
Ok(appender) => {
let (writer, guard) = tracing_appender::non_blocking(appender);
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(writer);
(Some(layer), Some(guard))
}
Err(e) => {
eprintln!(
"warning: cannot build rolling log appender: {e} (file logging disabled)"
);
(None, None)
}
}
}
Err(e) => {
eprintln!(
"warning: cannot create log directory {}: {e} (file logging disabled)",
logdir::log_dir().display()
);
(None, None)
}
}
} else {
(None, None)
};
let mut guard = LogGuard {
_appender: appender_guard,
#[cfg(feature = "otel")]
otel_provider: None,
};
let otel_layer = build_otel_layer(&mut guard);
tracing_subscriber::registry()
.with(global_filter)
.with(console_layer)
.with(file_layer)
.with(otel_layer)
.init();
guard
}
#[cfg(feature = "otel")]
fn build_otel_layer<S>(
guard: &mut LogGuard,
) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync>>
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Send + Sync,
{
use opentelemetry::trace::TracerProvider as _;
use tracing_subscriber::Layer as _;
if std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_none()
&& std::env::var_os("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT").is_none()
{
return None;
}
let exporter = match opentelemetry_otlp::SpanExporter::builder()
.with_http()
.build()
{
Ok(e) => e,
Err(e) => {
eprintln!("otel: failed to build OTLP exporter: {e}");
return None;
}
};
let resource = opentelemetry_sdk::Resource::builder()
.with_service_name("rayfish")
.build();
let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource)
.build();
let tracer = provider.tracer("rayfish");
guard.otel_provider = Some(provider);
tracing::info!("OpenTelemetry OTLP span export enabled");
Some(tracing_opentelemetry::layer().with_tracer(tracer).boxed())
}
#[cfg(not(feature = "otel"))]
fn build_otel_layer(_guard: &mut LogGuard) -> Option<tracing_subscriber::layer::Identity> {
None
}
fn install_panic_hook() {
let default_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
let backtrace = std::backtrace::Backtrace::force_capture();
let location = info
.location()
.map(|l| format!("{}:{}:{}", l.file(), l.line(), l.column()))
.unwrap_or_else(|| "unknown".to_string());
let thread = std::thread::current()
.name()
.unwrap_or("unnamed")
.to_string();
let message = info
.payload()
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| info.payload().downcast_ref::<String>().cloned())
.unwrap_or_else(|| "<non-string panic payload>".to_string());
tracing::error!(
location = %location,
thread = %thread,
"panic: {message}\n{backtrace}"
);
if let Err(e) = append_panic_log(&location, &thread, &message, &backtrace) {
eprintln!("failed to write panic log: {e}");
}
rayfish::dns_config::emergency_restore_resolv_conf();
default_hook(info);
std::process::abort();
}));
}
fn append_panic_log(
location: &str,
thread: &str,
message: &str,
backtrace: &std::backtrace::Backtrace,
) -> std::io::Result<()> {
use std::io::Write as _;
let dir = logdir::log_dir();
std::fs::create_dir_all(&dir)?;
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(dir.join("panic.log"))?;
writeln!(f, "=== panic @ unix {ts} ===")?;
writeln!(f, "thread: {thread}")?;
writeln!(f, "location: {location}")?;
writeln!(f, "message: {message}")?;
writeln!(f, "backtrace:\n{backtrace}\n")?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
if cli.json {
JSON_FLAG.store(true, atomic::Ordering::Relaxed);
style::set_plain(true);
}
let _log_guard = init_tracing(matches!(cli.command, Command::Daemon));
match cli.command {
Command::Leave { name } => ipc_leave(&name).await,
Command::Create {
open,
closed: _,
name,
hostname,
tor,
} => {
let mode = if open {
GroupMode::Open
} else {
GroupMode::Restricted
};
ipc_create(mode, name, hostname, tor).await
}
Command::Join {
network_key,
name,
hostname,
tor,
auto_accept_firewall,
} => {
ipc_join(
&network_key,
name.as_deref(),
hostname,
tor,
auto_accept_firewall,
)
.await
}
Command::Nuke { name, force } => ipc_nuke(&name, force).await,
Command::Status => ipc_status().await,
Command::Report => ipc_report().await,
Command::Daemon => {
check_root();
install_panic_hook();
let token = shutdown::token();
let stats = Arc::new(stats::ForwardMetrics::default());
stats.spawn_logger(token.clone());
daemon::run_daemon(token, stats).await
}
Command::Up { hostname } => cmd_up(hostname).await,
Command::Down => ipc_down().await,
Command::Stop => cmd_stop().await,
Command::Start => cmd_start().await,
Command::Uninstall => cmd_uninstall_service(),
Command::Install => cmd_install().await,
Command::Restart => cmd_restart().await,
Command::Completions { shell } => {
clap_complete::generate(shell, &mut Cli::command(), "ray", &mut std::io::stdout());
Ok(())
}
Command::Invite { network, action } => ipc_invite(&network, action).await,
Command::Requests { network } => ipc_requests(&network).await,
Command::Accept { network, id } => ipc_accept_request(&network, &id).await,
Command::Deny { network, id } => ipc_deny_request(&network, &id).await,
Command::Connect {
contact_id,
hostname,
} => ipc_connect(&contact_id, hostname).await,
Command::Connections { action } => ipc_connections(action).await,
Command::Contact { action } => ipc_contact(action).await,
Command::Ping {
peer,
count,
interval,
} => ipc_ping(&peer, count, interval).await,
Command::Netcheck => ipc_netcheck().await,
Command::Admin { network, action } => ipc_admin(&network, action).await,
Command::Firewall { action } => ipc_firewall(action).await,
Command::Apply {
spec,
prune,
dry_run,
invite_missing,
example,
} => ipc_apply(spec, prune, dry_run, invite_missing, example).await,
Command::Hostname { network, name } => ipc_set_hostname(&network, &name).await,
Command::Identityof { network, hostname } => {
cmd_identityof(&network, &hostname, cli.json).await
}
Command::Mdns { state } => cmd_mdns(&state),
Command::Config { action } => cmd_config(action, cli.json),
Command::SetOperator { user } => cmd_set_operator(&user).await,
Command::Send { file, peer } => ipc_send_file(&file, &peer).await,
Command::Files { action } => ipc_files(action).await,
Command::Pair { action, ticket } => cmd_pair(action, ticket).await,
Command::Version => {
println!("ray {FULL_VERSION}");
Ok(())
}
Command::Update {
force,
check,
nightly,
list,
version,
} => cmd_update(force, check, nightly, list, version).await,
}
}
fn cmd_mdns(state: &str) -> Result<()> {
let enabled = match state {
"on" => true,
"off" => false,
_ => {
eprintln!("Usage: rayfish mdns <on|off>");
std::process::exit(1);
}
};
let mut app_config = config::load()?;
app_config.mdns_enabled = enabled;
config::save_settings(&app_config)?;
println!(
"mDNS discovery {}. Restart the daemon for changes to take effect.",
if enabled { "enabled" } else { "disabled" }
);
Ok(())
}
fn cmd_config(action: Option<ConfigAction>, json: bool) -> Result<()> {
match action.unwrap_or(ConfigAction::Get { key: None }) {
ConfigAction::Get { key } => {
let cfg = config::load()?;
let rows = config::config_get(&cfg, key.as_deref())?;
if json {
let map: serde_json::Map<String, serde_json::Value> = rows
.into_iter()
.map(|(k, v)| (k, serde_json::Value::String(v)))
.collect();
print_json(&serde_json::Value::Object(map));
} else {
for (k, v) in rows {
println!("{k} = {v}");
}
}
}
ConfigAction::Set { key, value, replace } => {
let mut cfg = config::load()?;
config::config_set(&mut cfg, &key, &value, replace)?;
config::save_settings(&cfg)?;
println!("Set {key}. Run 'sudo ray restart' for changes to take effect.");
}
ConfigAction::Unset { key } => {
let mut cfg = config::load()?;
config::config_set(&mut cfg, &key, "", false)?;
config::save_settings(&cfg)?;
println!("Reset {key} to default. Run 'sudo ray restart' for changes to take effect.");
}
}
Ok(())
}
fn uid_for_user(user: &str) -> Option<u32> {
use std::ffi::CString;
let cname = CString::new(user).ok()?;
let pw = unsafe { libc::getpwnam(cname.as_ptr()) };
if !pw.is_null() {
return Some(unsafe { (*pw).pw_uid });
}
user.parse::<u32>().ok()
}
async fn cmd_set_operator(user: &str) -> Result<()> {
let uid = uid_for_user(user)
.ok_or_else(|| anyhow::anyhow!("unknown user '{user}' (pass a valid username or UID)"))?;
let mut stream = ipc::connect()
.await
.context("rayfish daemon is not running; start it with: sudo ray up")?;
ipc::send(&mut stream, ipc::IpcMessage::SetOperator { uid }).await?;
match ipc::recv(&mut stream).await? {
ipc::IpcMessage::Ok { message } => println!("{message}"),
ipc::IpcMessage::Error { message } => {
print_error("error", &message, None);
std::process::exit(1);
}
other => eprintln!("Unexpected response: {other:?}"),
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use ipc::FirewallRuleView;
#[test]
fn strip_deleted_suffix_sanitizes_replaced_binary_path() {
assert_eq!(
strip_deleted_suffix("/usr/local/bin/ray (deleted)"),
"/usr/local/bin/ray"
);
assert_eq!(
strip_deleted_suffix("/usr/local/bin/ray"),
"/usr/local/bin/ray"
);
assert_eq!(
strip_deleted_suffix("/opt/ray (deleted)/ray"),
"/opt/ray (deleted)/ray"
);
}
#[test]
fn parse_suggest_token_defaults_peer_to_any_for_bare_proto() {
assert_eq!(
parse_suggest_token("tcp:22", "--allow").unwrap(),
("*".to_string(), "tcp:22".to_string())
);
assert_eq!(
parse_suggest_token("udp:53", "--allow").unwrap(),
("*".to_string(), "udp:53".to_string())
);
assert_eq!(
parse_suggest_token("icmp", "--allow").unwrap(),
("*".to_string(), "icmp".to_string())
);
assert_eq!(
parse_suggest_token("any:*", "--allow").unwrap(),
("*".to_string(), "any:*".to_string())
);
}
#[test]
fn parse_suggest_token_keeps_explicit_peer() {
assert_eq!(
parse_suggest_token("earn01:tcp:9000,tcp:8123", "--allow").unwrap(),
("earn01".to_string(), "tcp:9000,tcp:8123".to_string())
);
assert_eq!(
parse_suggest_token("*:tcp:22", "--allow").unwrap(),
("*".to_string(), "tcp:22".to_string())
);
assert_eq!(
parse_suggest_token("alice:icmp", "--deny").unwrap(),
("alice".to_string(), "icmp".to_string())
);
}
#[test]
fn parse_suggest_token_rejects_empty() {
assert!(parse_suggest_token("", "--allow").is_err());
assert!(parse_suggest_token("alice", "--allow").is_err());
}
#[test]
fn release_asset_name_maps_supported_platforms() {
assert_eq!(
release_asset_name("linux", "x86_64").unwrap(),
"ray-linux-x86_64"
);
assert_eq!(
release_asset_name("linux", "aarch64").unwrap(),
"ray-linux-aarch64"
);
assert_eq!(
release_asset_name("macos", "x86_64").unwrap(),
"ray-macos-x86_64"
);
assert_eq!(
release_asset_name("macos", "aarch64").unwrap(),
"ray-macos-aarch64"
);
}
#[test]
fn release_asset_name_rejects_unsupported_platforms() {
assert!(release_asset_name("windows", "x86_64").is_err());
assert!(release_asset_name("linux", "riscv64").is_err());
}
#[test]
fn normalize_version_strips_leading_v() {
assert_eq!(normalize_version("v0.1.0"), "0.1.0");
assert_eq!(normalize_version("0.1.0"), "0.1.0");
assert_eq!(normalize_version("v1.2.3-rc1"), "1.2.3-rc1");
}
#[test]
fn version_is_newer_orders_semver() {
assert!(version_is_newer("0.2.0", "0.1.0"));
assert!(version_is_newer("1.0.0", "0.9.9"));
assert!(!version_is_newer("0.1.0", "0.1.0"));
assert!(!version_is_newer("0.1.0", "0.2.0")); assert!(version_is_newer("0.1.0", "0.1.0-rc1")); assert!(version_is_newer("nightly", "0.1.0"));
assert!(!version_is_newer("weird", "weird"));
}
fn view(
dir: &str,
action: &str,
proto: &str,
port: &str,
peer: &str,
net: &str,
sugg: Option<&str>,
) -> FirewallRuleView {
FirewallRuleView {
direction: dir.parse().unwrap(),
action: action.parse().unwrap(),
protocol: proto.parse().unwrap(),
port: port.into(),
peer: peer.into(),
network: net.into(),
suggested_by: sugg.map(str::to_string),
}
}
#[test]
fn firewall_table_aligns_without_color() {
style::set_plain(true);
let rules = vec![
view("in", "allow", "tcp", "443", "any", "any", None),
view(
"out",
"deny",
"udp",
"53",
"abc1",
"homelab",
Some("homelab"),
),
];
let out = render_firewall_rules(
Some((firewall::Action::Allow, firewall::Action::Allow)),
false,
&rules,
);
assert!(out.contains("default in allow"));
assert!(out.contains("default out allow"));
let lines: Vec<&str> = out
.lines()
.filter(|l| l.contains("allow") || l.contains("deny"))
.collect();
assert!(out.contains("·suggested by homelab·"));
assert!(!out.contains('\u{1b}'));
assert!(lines.iter().any(|l| l.contains("443")));
}
#[test]
fn empty_firewall_says_no_rules() {
style::set_plain(true);
let out =
render_firewall_rules(Some((firewall::Action::Deny, firewall::Action::Allow)), false, &[]);
assert!(out.contains("default in deny"));
assert!(out.contains("default out allow"));
assert!(out.contains("(no rules)"));
}
}