use anyhow::Result;
use bytes::Bytes;
use iroh::endpoint::{AckFrequencyConfig, Connection, QuicTransportConfig, VarInt};
use iroh::{Endpoint, EndpointAddr, PublicKey, RelayMode, SecretKey};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio_util::sync::CancellationToken;
use iroh_quinn_proto::congestion;
#[derive(Debug)]
struct NoopController {
mtu: u16,
}
impl congestion::Controller for NoopController {
fn on_congestion_event(
&mut self,
_now: std::time::Instant,
_sent: std::time::Instant,
_is_persistent_congestion: bool,
_in_flight_loss: bool,
_lost_bytes: u64,
) {
}
fn on_mtu_update(&mut self, new_mtu: u16) {
self.mtu = new_mtu;
}
fn window(&self) -> u64 {
u64::MAX
}
fn clone_box(&self) -> Box<dyn congestion::Controller> {
Box::new(NoopController { mtu: self.mtu })
}
fn initial_window(&self) -> u64 {
u64::MAX
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
}
struct NoopControllerFactory;
impl congestion::ControllerFactory for NoopControllerFactory {
fn build(
self: Arc<Self>,
_now: std::time::Instant,
current_mtu: u16,
) -> Box<dyn congestion::Controller> {
Box::new(NoopController { mtu: current_mtu })
}
}
fn low_latency_transport_config() -> QuicTransportConfig {
let mut ack_freq = AckFrequencyConfig::default();
ack_freq.ack_eliciting_threshold(VarInt::from_u32(0));
ack_freq.max_ack_delay(Some(Duration::from_millis(1)));
QuicTransportConfig::builder()
.initial_rtt(Duration::from_millis(10))
.keep_alive_interval(Duration::from_secs(1))
.ack_frequency_config(Some(ack_freq))
.congestion_controller_factory(Arc::new(NoopControllerFactory))
.enable_segmentation_offload(false)
.build()
}
pub const DEFAULT_RELAY_URL: &str = "https://cdn.1ms.ai:3341";
pub const DEFAULT_ALPN: &[u8] = b"xoq/p2p/0";
pub const CAMERA_ALPN: &[u8] = b"xoq/camera/0";
pub const CAMERA_ALPN_JPEG: &[u8] = b"xoq/camera-jpeg/0";
pub const CAMERA_ALPN_H264: &[u8] = b"xoq/camera-h264/0";
pub const CAMERA_ALPN_HEVC: &[u8] = b"xoq/camera-hevc/0";
pub const CAMERA_ALPN_AV1: &[u8] = b"xoq/camera-av1/0";
pub struct IrohServerBuilder {
key_path: Option<PathBuf>,
secret_key: Option<SecretKey>,
alpn: Vec<u8>,
relay_url: Option<String>,
}
impl IrohServerBuilder {
pub fn new() -> Self {
Self {
key_path: None,
secret_key: None,
alpn: DEFAULT_ALPN.to_vec(),
relay_url: None,
}
}
pub fn identity_path(mut self, path: impl Into<PathBuf>) -> Self {
self.key_path = Some(path.into());
self
}
pub fn secret_key(mut self, key: SecretKey) -> Self {
self.secret_key = Some(key);
self
}
pub fn alpn(mut self, alpn: &[u8]) -> Self {
self.alpn = alpn.to_vec();
self
}
pub fn relay_url(mut self, url: impl Into<String>) -> Self {
self.relay_url = Some(url.into());
self
}
pub async fn bind(self) -> Result<IrohServer> {
let secret_key = match (self.secret_key, self.key_path) {
(Some(key), _) => key,
(None, Some(path)) => load_or_generate_key(&path).await?,
(None, None) => SecretKey::generate(&mut rand::rng()),
};
let relay_url_str = self.relay_url.as_deref().unwrap_or(DEFAULT_RELAY_URL);
let relay_url: iroh::RelayUrl = relay_url_str
.parse()
.map_err(|e| anyhow::anyhow!("Invalid relay URL '{}': {}", relay_url_str, e))?;
tracing::info!("Iroh relay: {}", relay_url_str);
let endpoint = Endpoint::builder()
.alpns(vec![self.alpn])
.secret_key(secret_key)
.relay_mode(RelayMode::custom([relay_url]))
.transport_config(low_latency_transport_config())
.bind()
.await?;
match tokio::time::timeout(Duration::from_secs(5), endpoint.online()).await {
Ok(_) => {
tracing::info!("Iroh server: relay connected, low-latency transport config active")
}
Err(_) => tracing::warn!("Iroh server: relay timeout (5s), starting without relay"),
}
Ok(IrohServer { endpoint })
}
}
impl Default for IrohServerBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct IrohClientBuilder {
alpn: Vec<u8>,
relay_url: Option<String>,
}
impl IrohClientBuilder {
pub fn new() -> Self {
Self {
alpn: DEFAULT_ALPN.to_vec(),
relay_url: None,
}
}
pub fn alpn(mut self, alpn: &[u8]) -> Self {
self.alpn = alpn.to_vec();
self
}
pub fn relay_url(mut self, url: impl Into<String>) -> Self {
self.relay_url = Some(url.into());
self
}
pub async fn connect(self, server_id: PublicKey) -> Result<IrohConnection> {
let relay_url_str = self.relay_url.as_deref().unwrap_or(DEFAULT_RELAY_URL);
let relay_url: iroh::RelayUrl = relay_url_str
.parse()
.map_err(|e| anyhow::anyhow!("Invalid relay URL '{}': {}", relay_url_str, e))?;
tracing::info!("Iroh relay: {}", relay_url_str);
let endpoint = Endpoint::builder()
.relay_mode(RelayMode::custom([relay_url]))
.transport_config(low_latency_transport_config())
.bind()
.await?;
let addr = EndpointAddr::from(server_id);
let conn = endpoint.connect(addr, &self.alpn).await?;
Ok(IrohConnection {
conn,
_endpoint: endpoint,
cancel_token: CancellationToken::new(),
})
}
pub async fn connect_str(self, server_id: &str) -> Result<IrohConnection> {
let id: PublicKey = server_id.parse()?;
self.connect(id).await
}
}
impl Default for IrohClientBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct IrohServer {
endpoint: Endpoint,
}
impl IrohServer {
pub fn id(&self) -> PublicKey {
self.endpoint.id()
}
pub fn addr(&self) -> EndpointAddr {
self.endpoint.addr()
}
pub async fn accept(&self) -> Result<Option<IrohConnection>> {
if let Some(incoming) = self.endpoint.accept().await {
let conn = incoming.await?;
return Ok(Some(IrohConnection {
conn,
_endpoint: self.endpoint.clone(),
cancel_token: CancellationToken::new(),
}));
}
Ok(None)
}
pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}
}
pub struct IrohConnection {
conn: Connection,
_endpoint: Endpoint,
cancel_token: CancellationToken,
}
impl IrohConnection {
pub fn remote_id(&self) -> PublicKey {
self.conn.remote_id()
}
pub async fn open_stream(&self) -> Result<IrohStream> {
let (send, recv) = self.conn.open_bi().await?;
Ok(IrohStream { send, recv })
}
pub async fn accept_stream(&self) -> Result<IrohStream> {
let (send, recv) = self.conn.accept_bi().await?;
Ok(IrohStream { send, recv })
}
pub fn connection(&self) -> &Connection {
&self.conn
}
pub fn send_datagram(&self, data: Bytes) -> Result<()> {
self.conn.send_datagram(data)?;
Ok(())
}
pub async fn recv_datagram(&self) -> Result<Bytes> {
Ok(self.conn.read_datagram().await?)
}
pub fn max_datagram_size(&self) -> Option<usize> {
self.conn.max_datagram_size()
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
}
impl Drop for IrohConnection {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
pub struct IrohStream {
send: iroh::endpoint::SendStream,
recv: iroh::endpoint::RecvStream,
}
impl IrohStream {
pub async fn write(&mut self, data: &[u8]) -> Result<()> {
self.send.write_all(data).await?;
Ok(())
}
pub async fn flush(&mut self) -> Result<()> {
tokio::task::yield_now().await;
Ok(())
}
pub async fn write_str(&mut self, data: &str) -> Result<()> {
self.write(data.as_bytes()).await
}
pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>> {
Ok(self.recv.read(buf).await?)
}
pub async fn read_string(&mut self) -> Result<Option<String>> {
let mut buf = vec![0u8; 4096];
if let Some(n) = self.read(&mut buf).await? {
return Ok(Some(String::from_utf8_lossy(&buf[..n]).to_string()));
}
Ok(None)
}
pub fn split(self) -> (iroh::endpoint::SendStream, iroh::endpoint::RecvStream) {
(self.send, self.recv)
}
}
async fn load_or_generate_key(path: &PathBuf) -> Result<SecretKey> {
if path.exists() {
let bytes = fs::read(path).await?;
let key_bytes: [u8; 32] = bytes
.try_into()
.map_err(|_| anyhow::anyhow!("Invalid key file"))?;
Ok(SecretKey::from_bytes(&key_bytes))
} else {
let key = SecretKey::generate(&mut rand::rng());
fs::write(path, key.to_bytes()).await?;
Ok(key)
}
}
pub fn generate_key() -> SecretKey {
SecretKey::generate(&mut rand::rng())
}
pub async fn save_key(key: &SecretKey, path: impl Into<PathBuf>) -> Result<()> {
fs::write(path.into(), key.to_bytes()).await?;
Ok(())
}
pub async fn load_key(path: impl Into<PathBuf>) -> Result<SecretKey> {
let bytes = fs::read(path.into()).await?;
let key_bytes: [u8; 32] = bytes
.try_into()
.map_err(|_| anyhow::anyhow!("Invalid key file"))?;
Ok(SecretKey::from_bytes(&key_bytes))
}