use anyhow::{Context, Result};
use iroh::{
Endpoint, EndpointAddr, EndpointId, RelayMode, RelayUrl, SecretKey,
address_lookup::{PkarrPublisher, PkarrResolver},
endpoint::Builder,
endpoint::Connection,
endpoint::presets,
};
use crate::config::ServerOverride;
#[cfg(feature = "tor")]
use std::sync::Arc;
pub const FILES_ALPN: &[u8] = b"rayfish/files/1";
pub const CONNECT_ALPN: &[u8] = b"rayfish/connect/1";
pub const RAYFISH_LISTEN_PORT: u16 = 41383;
pub const MESH_PROTOCOL_VERSION: u32 = 1;
pub fn network_alpn(network_pubkey: &EndpointId) -> Vec<u8> {
let full = network_pubkey.to_string();
let prefix = &full[..full.len().min(16)];
format!("rayfish/net/{MESH_PROTOCOL_VERSION}/{prefix}").into_bytes()
}
pub async fn create_endpoint_with_alpns(
secret_key: SecretKey,
alpns: Vec<Vec<u8>>,
tor: bool,
relay: &ServerOverride,
discovery: &ServerOverride,
) -> Result<Endpoint> {
let fixed = format!("0.0.0.0:{RAYFISH_LISTEN_PORT}");
let ep = match bind_endpoint(&secret_key, &alpns, tor, &fixed, relay, discovery).await {
Ok(ep) => ep,
Err(e) => {
tracing::warn!(
port = RAYFISH_LISTEN_PORT,
error = %e,
"fixed UDP port unavailable; falling back to an ephemeral port"
);
bind_endpoint(&secret_key, &alpns, tor, "0.0.0.0:0", relay, discovery)
.await
.context("failed to bind iroh endpoint")?
}
};
tracing::info!(id = %ep.id().fmt_short(), "iroh endpoint ready");
Ok(ep)
}
async fn bind_endpoint(
secret_key: &SecretKey,
alpns: &[Vec<u8>],
tor: bool,
bind: &str,
relay: &ServerOverride,
discovery: &ServerOverride,
) -> Result<Endpoint> {
#[allow(unused_mut)]
let mut builder = Endpoint::builder(presets::N0)
.secret_key(secret_key.clone())
.alpns(alpns.to_vec())
.clear_ip_transports()
.bind_addr(bind)
.context("invalid bind address")?;
if let Some(mode) = build_relay_mode(relay)? {
builder = builder.relay_mode(mode);
}
builder = apply_discovery(builder, discovery)?;
#[cfg(feature = "tor")]
if tor {
let tor_transport = iroh_tor_transport::TorCustomTransport::builder()
.build(secret_key.clone())
.await
.context("failed to create Tor transport — is Tor running with ControlPort 9051?")?;
builder = builder
.add_custom_transport(
tor_transport.clone() as Arc<dyn iroh::endpoint::transports::CustomTransport>
)
.address_lookup(tor_transport.discovery());
tracing::info!("Tor transport enabled");
}
#[cfg(not(feature = "tor"))]
if tor {
anyhow::bail!("Tor support requires building with --features tor");
}
builder.bind().await.context("failed to bind iroh endpoint")
}
pub fn build_relay_mode(o: &ServerOverride) -> Result<Option<RelayMode>> {
let urls = crate::config::relay_urls(o)?;
if urls.is_empty() {
return Ok(None);
}
let mut parsed: Vec<RelayUrl> = urls
.iter()
.map(|u| u.parse().with_context(|| format!("invalid relay URL: {u}")))
.collect::<Result<_>>()?;
if !o.replace {
parsed.extend(RelayMode::Default.relay_map().urls::<Vec<RelayUrl>>());
}
Ok(Some(RelayMode::custom(parsed)))
}
fn apply_discovery(mut builder: Builder, o: &ServerOverride) -> Result<Builder> {
let urls = crate::config::discovery_urls(o)?;
if urls.is_empty() {
return Ok(builder);
}
if o.replace {
builder = builder.clear_address_lookup();
}
for u in urls {
let url: url::Url = u
.parse()
.with_context(|| format!("invalid discovery URL: {u}"))?;
builder = builder
.address_lookup(PkarrPublisher::builder(url.clone()))
.address_lookup(PkarrResolver::builder(url));
}
Ok(builder)
}
#[allow(dead_code)]
pub async fn accept_connection_with_alpn(ep: &Endpoint) -> Result<(Connection, Vec<u8>)> {
let incoming = ep.accept().await.context("no incoming connection")?;
let conn = incoming.await.context("failed to accept connection")?;
let alpn = conn.alpn().to_vec();
tracing::info!(
peer = %conn.remote_id().fmt_short(),
alpn = %String::from_utf8_lossy(&alpn),
"peer connected"
);
Ok((conn, alpn))
}
pub async fn connect_to_peer_with_alpn(
ep: &Endpoint,
id: EndpointId,
alpn: &[u8],
) -> Result<Connection> {
let addr: EndpointAddr = id.into();
let conn = match ep.connect(addr, alpn).await {
Ok(conn) => conn,
Err(e) if is_alpn_mismatch(&e.to_string()) => {
return Err(e).context(
"no shared protocol with peer — it may be running an incompatible \
rayfish version (run `ray update`)",
);
}
Err(e) => return Err(e).context("failed to connect to peer"),
};
tracing::info!(
peer = %conn.remote_id().fmt_short(),
alpn = %String::from_utf8_lossy(alpn),
"connected to peer"
);
Ok(conn)
}
pub(crate) fn is_alpn_mismatch(err: &str) -> bool {
let e = err.to_lowercase();
e.contains("known protocol") || e.contains("application protocol")
}
#[cfg(test)]
mod tests {
use super::*;
use iroh::SecretKey;
#[test]
fn test_network_alpn() {
let key = SecretKey::generate().public();
let alpn = network_alpn(&key);
let key_str = key.to_string();
let expected = format!("rayfish/net/{MESH_PROTOCOL_VERSION}/{}", &key_str[..16]);
assert_eq!(alpn, expected.as_bytes());
}
#[test]
fn relay_mode_augment_vs_replace() {
assert!(build_relay_mode(&ServerOverride::default()).unwrap().is_none());
let custom = "https://relay.example.com".to_string();
let rep = ServerOverride { servers: vec![custom.clone()], replace: true };
let mode = build_relay_mode(&rep).unwrap().expect("some mode");
assert_eq!(mode.relay_map().urls::<Vec<RelayUrl>>().len(), 1);
let aug = ServerOverride { servers: vec![custom], replace: false };
let mode = build_relay_mode(&aug).unwrap().expect("some mode");
assert!(mode.relay_map().urls::<Vec<RelayUrl>>().len() > 1);
}
#[test]
fn alpn_mismatch_classifier() {
assert!(is_alpn_mismatch(
"connection closed: peer doesn't support any known protocol"
));
assert!(is_alpn_mismatch(
"the cryptographic handshake failed: no application protocol"
));
assert!(!is_alpn_mismatch("connection timed out"));
assert!(!is_alpn_mismatch("connection refused"));
}
}