pub mod sam;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use rns_core::constants;
use rns_core::transport::types::{InterfaceId, InterfaceInfo};
use crate::event::{Event, EventSender};
use crate::hdlc;
use crate::interface::Writer;
use self::sam::{Destination, SamError};
#[allow(dead_code)]
const HW_MTU: usize = 1064;
const BITRATE_GUESS: u64 = 256_000;
const RECONNECT_WAIT: Duration = Duration::from_secs(15);
#[derive(Debug, Clone)]
pub struct I2pConfig {
pub name: String,
pub interface_id: InterfaceId,
pub sam_host: String,
pub sam_port: u16,
pub peers: Vec<String>,
pub connectable: bool,
pub storage_dir: PathBuf,
pub ingress_control: rns_core::transport::types::IngressControlConfig,
pub runtime: Arc<Mutex<I2pRuntime>>,
}
#[derive(Debug, Clone)]
pub struct I2pRuntime {
pub reconnect_wait: Duration,
}
impl I2pRuntime {
pub fn from_config(_config: &I2pConfig) -> Self {
Self {
reconnect_wait: RECONNECT_WAIT,
}
}
}
#[derive(Debug, Clone)]
pub struct I2pRuntimeConfigHandle {
pub interface_name: String,
pub runtime: Arc<Mutex<I2pRuntime>>,
pub startup: I2pRuntime,
}
impl Default for I2pConfig {
fn default() -> Self {
let mut config = I2pConfig {
name: String::new(),
interface_id: InterfaceId(0),
sam_host: "127.0.0.1".into(),
sam_port: 7656,
peers: Vec::new(),
connectable: false,
storage_dir: PathBuf::from("."),
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
runtime: Arc::new(Mutex::new(I2pRuntime {
reconnect_wait: RECONNECT_WAIT,
})),
};
let startup = I2pRuntime::from_config(&config);
config.runtime = Arc::new(Mutex::new(startup));
config
}
}
struct I2pWriter {
stream: std::net::TcpStream,
}
impl Writer for I2pWriter {
fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
self.stream.write_all(&hdlc::frame(data))
}
}
fn sanitize_name(name: &str) -> String {
name.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect()
}
fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
}
fn load_or_generate_keypair(
sam_addr: &SocketAddr,
storage_dir: &PathBuf,
name: &str,
) -> Result<sam::KeyPair, SamError> {
let key_path = key_file_path(storage_dir, name);
if key_path.exists() {
let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
Ok(sam::KeyPair {
destination: Destination { data: Vec::new() }, private_key: priv_data,
})
} else {
log::info!("[{}] generating new I2P destination keypair", name);
let keypair = sam::dest_generate(sam_addr)?;
if let Some(parent) = key_path.parent() {
std::fs::create_dir_all(parent).map_err(SamError::Io)?;
}
std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
log::info!("[{}] saved I2P key to {:?}", name, key_path);
Ok(keypair)
}
}
pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
let name = config.name.clone();
thread::Builder::new()
.name(format!("i2p-coord-{}", config.interface_id.0))
.spawn(move || {
if let Err(e) = coordinator(config, tx, next_id) {
log::error!("[{}] I2P coordinator failed: {}", name, e);
}
})?;
Ok(())
}
fn coordinator(
config: I2pConfig,
tx: EventSender,
next_id: Arc<AtomicU64>,
) -> Result<(), SamError> {
let ingress_control = config.ingress_control;
let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
.parse()
.map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
let session_id = sanitize_name(&config.name);
log::info!("[{}] creating SAM session (id={})", config.name, session_id);
let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
match sam::naming_lookup_on(&mut control_socket, "ME") {
Ok(our_dest) => {
let b32 = our_dest.base32_address();
log::info!("[{}] I2P address: {}", config.name, b32);
}
Err(e) => {
log::warn!("[{}] could not look up own destination: {}", config.name, e);
}
}
for peer_addr in &config.peers {
let peer_addr = peer_addr.trim().to_string();
if peer_addr.is_empty() {
continue;
}
let tx2 = tx.clone();
let next_id2 = next_id.clone();
let sam_addr2 = sam_addr;
let session_id2 = session_id.clone();
let iface_name = config.name.clone();
let runtime = Arc::clone(&config.runtime);
thread::Builder::new()
.name(format!("i2p-out-{}", peer_addr))
.spawn(move || {
outbound_peer_loop(
sam_addr2,
&session_id2,
&peer_addr,
&iface_name,
tx2,
next_id2,
runtime,
ingress_control,
);
})
.ok();
}
if config.connectable {
let tx2 = tx.clone();
let next_id2 = next_id.clone();
let sam_addr2 = sam_addr;
let session_id2 = session_id.clone();
let iface_name = config.name.clone();
thread::Builder::new()
.name("i2p-acceptor".into())
.spawn(move || {
acceptor_loop(
sam_addr2,
&session_id2,
&iface_name,
tx2,
next_id2,
ingress_control,
);
})
.ok();
}
let _keep_alive = control_socket;
loop {
thread::sleep(Duration::from_secs(3600));
}
}
fn outbound_peer_loop(
sam_addr: SocketAddr,
session_id: &str,
peer_addr: &str,
iface_name: &str,
tx: EventSender,
next_id: Arc<AtomicU64>,
runtime: Arc<Mutex<I2pRuntime>>,
ingress_control: rns_core::transport::types::IngressControlConfig,
) {
loop {
log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
let destination = if peer_addr.ends_with(".i2p") {
match sam::naming_lookup(&sam_addr, peer_addr) {
Ok(dest) => dest.to_i2p_base64(),
Err(e) => {
log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
thread::sleep(runtime.lock().unwrap().reconnect_wait);
continue;
}
}
} else {
peer_addr.to_string()
};
match sam::stream_connect(&sam_addr, session_id, &destination) {
Ok(stream) => {
let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
log::info!(
"[{}] connected to I2P peer {} → id {}",
iface_name,
peer_addr,
client_id.0
);
let writer_stream = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
log::warn!("[{}] failed to clone stream: {}", iface_name, e);
thread::sleep(runtime.lock().unwrap().reconnect_wait);
continue;
}
};
let writer: Box<dyn Writer> = Box::new(I2pWriter {
stream: writer_stream,
});
let info = InterfaceInfo {
id: client_id,
name: format!("I2PInterface/{}", peer_addr),
mode: constants::MODE_FULL,
out_capable: true,
in_capable: true,
bitrate: Some(BITRATE_GUESS),
announce_rate_target: None,
announce_rate_grace: 0,
announce_rate_penalty: 0.0,
announce_cap: constants::ANNOUNCE_CAP,
is_local_client: false,
wants_tunnel: false,
tunnel_id: None,
mtu: 65535,
ia_freq: 0.0,
started: 0.0,
ingress_control,
};
if tx
.send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
.is_err()
{
return; }
peer_reader_loop(stream, client_id, iface_name, &tx);
let _ = tx.send(Event::InterfaceDown(client_id));
log::warn!(
"[{}] I2P peer {} disconnected, reconnecting in {}s",
iface_name,
peer_addr,
runtime.lock().unwrap().reconnect_wait.as_secs()
);
}
Err(e) => {
log::warn!(
"[{}] failed to connect to I2P peer {}: {}",
iface_name,
peer_addr,
e
);
}
}
thread::sleep(runtime.lock().unwrap().reconnect_wait);
}
}
fn acceptor_loop(
sam_addr: SocketAddr,
session_id: &str,
iface_name: &str,
tx: EventSender,
next_id: Arc<AtomicU64>,
ingress_control: rns_core::transport::types::IngressControlConfig,
) {
loop {
match sam::stream_accept(&sam_addr, session_id) {
Ok((stream, remote_dest)) => {
let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
let remote_b32 = remote_dest.base32_address();
log::info!(
"[{}] accepted I2P connection from {} → id {}",
iface_name,
remote_b32,
client_id.0
);
let writer_stream = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
continue;
}
};
let writer: Box<dyn Writer> = Box::new(I2pWriter {
stream: writer_stream,
});
let info = InterfaceInfo {
id: client_id,
name: format!("I2PInterface/{}", remote_b32),
mode: constants::MODE_FULL,
out_capable: true,
in_capable: true,
bitrate: Some(BITRATE_GUESS),
announce_rate_target: None,
announce_rate_grace: 0,
announce_rate_penalty: 0.0,
announce_cap: constants::ANNOUNCE_CAP,
is_local_client: false,
wants_tunnel: false,
tunnel_id: None,
mtu: 65535,
ia_freq: 0.0,
started: 0.0,
ingress_control,
};
if tx
.send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
.is_err()
{
return; }
let client_tx = tx.clone();
let client_name = iface_name.to_string();
thread::Builder::new()
.name(format!("i2p-client-{}", client_id.0))
.spawn(move || {
peer_reader_loop(stream, client_id, &client_name, &client_tx);
let _ = client_tx.send(Event::InterfaceDown(client_id));
})
.ok();
}
Err(e) => {
log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
thread::sleep(Duration::from_secs(1));
}
}
}
}
fn peer_reader_loop(
mut stream: std::net::TcpStream,
id: InterfaceId,
name: &str,
tx: &EventSender,
) {
let mut decoder = hdlc::Decoder::new();
let mut buf = [0u8; 4096];
loop {
match stream.read(&mut buf) {
Ok(0) => {
log::info!("[{}] I2P peer {} disconnected", name, id.0);
return;
}
Ok(n) => {
for frame in decoder.feed(&buf[..n]) {
if tx
.send(Event::Frame {
interface_id: id,
data: frame,
})
.is_err()
{
return; }
}
}
Err(e) => {
log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
return;
}
}
}
}
use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
use std::collections::HashMap;
pub struct I2pFactory;
impl InterfaceFactory for I2pFactory {
fn type_name(&self) -> &str {
"I2PInterface"
}
fn parse_config(
&self,
name: &str,
id: InterfaceId,
params: &HashMap<String, String>,
) -> Result<Box<dyn InterfaceConfigData>, String> {
let sam_host = params
.get("sam_host")
.cloned()
.unwrap_or_else(|| "127.0.0.1".into());
let sam_port = params
.get("sam_port")
.and_then(|v| v.parse::<u16>().ok())
.unwrap_or(7656);
let connectable = params
.get("connectable")
.and_then(|v| crate::config::parse_bool_pub(v))
.unwrap_or(false);
let peers = params
.get("peers")
.map(|v| {
v.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<Vec<String>>()
})
.unwrap_or_default();
let storage_dir = params
.get("storage_dir")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
Ok(Box::new(I2pConfig {
name: name.to_string(),
interface_id: id,
sam_host,
sam_port,
connectable,
peers,
storage_dir,
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
runtime: Arc::new(Mutex::new(I2pRuntime {
reconnect_wait: RECONNECT_WAIT,
})),
}))
}
fn start(
&self,
config: Box<dyn InterfaceConfigData>,
ctx: StartContext,
) -> io::Result<StartResult> {
let mut cfg = *config
.into_any()
.downcast::<I2pConfig>()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
cfg.ingress_control = ctx.ingress_control;
start(cfg, ctx.tx, ctx.next_dynamic_id)?;
Ok(StartResult::Listener { control: None })
}
}
pub(crate) fn i2p_runtime_handle_from_config(config: &I2pConfig) -> I2pRuntimeConfigHandle {
I2pRuntimeConfigHandle {
interface_name: config.name.clone(),
runtime: Arc::clone(&config.runtime),
startup: I2pRuntime::from_config(config),
}
}