1pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33const BITRATE_GUESS: u64 = 256_000;
35
36const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39#[derive(Debug, Clone)]
41pub struct I2pConfig {
42 pub name: String,
43 pub interface_id: InterfaceId,
44 pub sam_host: String,
46 pub sam_port: u16,
48 pub peers: Vec<String>,
50 pub connectable: bool,
52 pub storage_dir: PathBuf,
54 pub ingress_control: rns_core::transport::types::IngressControlConfig,
55 pub runtime: Arc<Mutex<I2pRuntime>>,
56}
57
58#[derive(Debug, Clone)]
59pub struct I2pRuntime {
60 pub reconnect_wait: Duration,
61}
62
63impl I2pRuntime {
64 pub fn from_config(_config: &I2pConfig) -> Self {
65 Self {
66 reconnect_wait: RECONNECT_WAIT,
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
72pub struct I2pRuntimeConfigHandle {
73 pub interface_name: String,
74 pub runtime: Arc<Mutex<I2pRuntime>>,
75 pub startup: I2pRuntime,
76}
77
78impl Default for I2pConfig {
79 fn default() -> Self {
80 let mut config = I2pConfig {
81 name: String::new(),
82 interface_id: InterfaceId(0),
83 sam_host: "127.0.0.1".into(),
84 sam_port: 7656,
85 peers: Vec::new(),
86 connectable: false,
87 storage_dir: PathBuf::from("."),
88 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
89 runtime: Arc::new(Mutex::new(I2pRuntime {
90 reconnect_wait: RECONNECT_WAIT,
91 })),
92 };
93 let startup = I2pRuntime::from_config(&config);
94 config.runtime = Arc::new(Mutex::new(startup));
95 config
96 }
97}
98
99struct I2pWriter {
101 stream: std::net::TcpStream,
102}
103
104impl Writer for I2pWriter {
105 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
106 self.stream.write_all(&hdlc::frame(data))
107 }
108}
109
110fn sanitize_name(name: &str) -> String {
112 name.chars()
113 .map(|c| {
114 if c.is_alphanumeric() || c == '-' || c == '_' {
115 c
116 } else {
117 '_'
118 }
119 })
120 .collect()
121}
122
123fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
125 storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
126}
127
128fn load_or_generate_keypair(
131 sam_addr: &SocketAddr,
132 storage_dir: &PathBuf,
133 name: &str,
134) -> Result<sam::KeyPair, SamError> {
135 let key_path = key_file_path(storage_dir, name);
136
137 if key_path.exists() {
138 let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
140
141 Ok(sam::KeyPair {
160 destination: Destination { data: Vec::new() }, private_key: priv_data,
162 })
163 } else {
164 log::info!("[{}] generating new I2P destination keypair", name);
166 let keypair = sam::dest_generate(sam_addr)?;
167
168 if let Some(parent) = key_path.parent() {
170 std::fs::create_dir_all(parent).map_err(SamError::Io)?;
171 }
172 std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
173 log::info!("[{}] saved I2P key to {:?}", name, key_path);
174
175 Ok(keypair)
176 }
177}
178
179pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
182 let name = config.name.clone();
183
184 thread::Builder::new()
185 .name(format!("i2p-coord-{}", config.interface_id.0))
186 .spawn(move || {
187 if let Err(e) = coordinator(config, tx, next_id) {
188 log::error!("[{}] I2P coordinator failed: {}", name, e);
189 }
190 })?;
191
192 Ok(())
193}
194
195fn coordinator(
197 config: I2pConfig,
198 tx: EventSender,
199 next_id: Arc<AtomicU64>,
200) -> Result<(), SamError> {
201 let ingress_control = config.ingress_control;
202 let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
203 .parse()
204 .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
205
206 let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
208 let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
209
210 let session_id = sanitize_name(&config.name);
213
214 log::info!("[{}] creating SAM session (id={})", config.name, session_id);
215 let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
216
217 match sam::naming_lookup_on(&mut control_socket, "ME") {
220 Ok(our_dest) => {
221 let b32 = our_dest.base32_address();
222 log::info!("[{}] I2P address: {}", config.name, b32);
223 }
224 Err(e) => {
225 log::warn!("[{}] could not look up own destination: {}", config.name, e);
226 }
227 }
228
229 for peer_addr in &config.peers {
231 let peer_addr = peer_addr.trim().to_string();
232 if peer_addr.is_empty() {
233 continue;
234 }
235
236 let tx2 = tx.clone();
237 let next_id2 = next_id.clone();
238 let sam_addr2 = sam_addr;
239 let session_id2 = session_id.clone();
240 let iface_name = config.name.clone();
241 let runtime = Arc::clone(&config.runtime);
242
243 thread::Builder::new()
244 .name(format!("i2p-out-{}", peer_addr))
245 .spawn(move || {
246 outbound_peer_loop(
247 sam_addr2,
248 &session_id2,
249 &peer_addr,
250 &iface_name,
251 tx2,
252 next_id2,
253 runtime,
254 ingress_control,
255 );
256 })
257 .ok();
258 }
259
260 if config.connectable {
262 let tx2 = tx.clone();
263 let next_id2 = next_id.clone();
264 let sam_addr2 = sam_addr;
265 let session_id2 = session_id.clone();
266 let iface_name = config.name.clone();
267
268 thread::Builder::new()
269 .name("i2p-acceptor".into())
270 .spawn(move || {
271 acceptor_loop(
272 sam_addr2,
273 &session_id2,
274 &iface_name,
275 tx2,
276 next_id2,
277 ingress_control,
278 );
279 })
280 .ok();
281 }
282
283 let _keep_alive = control_socket;
286 loop {
287 thread::sleep(Duration::from_secs(3600));
288 }
289}
290
291fn outbound_peer_loop(
294 sam_addr: SocketAddr,
295 session_id: &str,
296 peer_addr: &str,
297 iface_name: &str,
298 tx: EventSender,
299 next_id: Arc<AtomicU64>,
300 runtime: Arc<Mutex<I2pRuntime>>,
301 ingress_control: rns_core::transport::types::IngressControlConfig,
302) {
303 loop {
304 log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
305
306 let destination = if peer_addr.ends_with(".i2p") {
308 match sam::naming_lookup(&sam_addr, peer_addr) {
309 Ok(dest) => dest.to_i2p_base64(),
310 Err(e) => {
311 log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
312 thread::sleep(runtime.lock().unwrap().reconnect_wait);
313 continue;
314 }
315 }
316 } else {
317 peer_addr.to_string()
319 };
320
321 match sam::stream_connect(&sam_addr, session_id, &destination) {
323 Ok(stream) => {
324 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
325
326 log::info!(
327 "[{}] connected to I2P peer {} → id {}",
328 iface_name,
329 peer_addr,
330 client_id.0
331 );
332
333 let writer_stream = match stream.try_clone() {
335 Ok(s) => s,
336 Err(e) => {
337 log::warn!("[{}] failed to clone stream: {}", iface_name, e);
338 thread::sleep(runtime.lock().unwrap().reconnect_wait);
339 continue;
340 }
341 };
342
343 let writer: Box<dyn Writer> = Box::new(I2pWriter {
344 stream: writer_stream,
345 });
346
347 let info = InterfaceInfo {
348 id: client_id,
349 name: format!("I2PInterface/{}", peer_addr),
350 mode: constants::MODE_FULL,
351 out_capable: true,
352 in_capable: true,
353 bitrate: Some(BITRATE_GUESS),
354 announce_rate_target: None,
355 announce_rate_grace: 0,
356 announce_rate_penalty: 0.0,
357 announce_cap: constants::ANNOUNCE_CAP,
358 is_local_client: false,
359 wants_tunnel: false,
360 tunnel_id: None,
361 mtu: 65535,
362 ia_freq: 0.0,
363 started: 0.0,
364 ingress_control,
365 };
366
367 if tx
369 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
370 .is_err()
371 {
372 return; }
374
375 peer_reader_loop(stream, client_id, iface_name, &tx);
377
378 let _ = tx.send(Event::InterfaceDown(client_id));
380 log::warn!(
381 "[{}] I2P peer {} disconnected, reconnecting in {}s",
382 iface_name,
383 peer_addr,
384 runtime.lock().unwrap().reconnect_wait.as_secs()
385 );
386 }
387 Err(e) => {
388 log::warn!(
389 "[{}] failed to connect to I2P peer {}: {}",
390 iface_name,
391 peer_addr,
392 e
393 );
394 }
395 }
396
397 thread::sleep(runtime.lock().unwrap().reconnect_wait);
398 }
399}
400
401fn acceptor_loop(
403 sam_addr: SocketAddr,
404 session_id: &str,
405 iface_name: &str,
406 tx: EventSender,
407 next_id: Arc<AtomicU64>,
408 ingress_control: rns_core::transport::types::IngressControlConfig,
409) {
410 loop {
411 match sam::stream_accept(&sam_addr, session_id) {
412 Ok((stream, remote_dest)) => {
413 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
414 let remote_b32 = remote_dest.base32_address();
415
416 log::info!(
417 "[{}] accepted I2P connection from {} → id {}",
418 iface_name,
419 remote_b32,
420 client_id.0
421 );
422
423 let writer_stream = match stream.try_clone() {
425 Ok(s) => s,
426 Err(e) => {
427 log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
428 continue;
429 }
430 };
431
432 let writer: Box<dyn Writer> = Box::new(I2pWriter {
433 stream: writer_stream,
434 });
435
436 let info = InterfaceInfo {
437 id: client_id,
438 name: format!("I2PInterface/{}", remote_b32),
439 mode: constants::MODE_FULL,
440 out_capable: true,
441 in_capable: true,
442 bitrate: Some(BITRATE_GUESS),
443 announce_rate_target: None,
444 announce_rate_grace: 0,
445 announce_rate_penalty: 0.0,
446 announce_cap: constants::ANNOUNCE_CAP,
447 is_local_client: false,
448 wants_tunnel: false,
449 tunnel_id: None,
450 mtu: 65535,
451 ia_freq: 0.0,
452 started: 0.0,
453 ingress_control,
454 };
455
456 if tx
458 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
459 .is_err()
460 {
461 return; }
463
464 let client_tx = tx.clone();
466 let client_name = iface_name.to_string();
467 thread::Builder::new()
468 .name(format!("i2p-client-{}", client_id.0))
469 .spawn(move || {
470 peer_reader_loop(stream, client_id, &client_name, &client_tx);
471 let _ = client_tx.send(Event::InterfaceDown(client_id));
472 })
473 .ok();
474 }
475 Err(e) => {
476 log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
477 thread::sleep(Duration::from_secs(1));
478 }
479 }
480 }
481}
482
483fn peer_reader_loop(
486 mut stream: std::net::TcpStream,
487 id: InterfaceId,
488 name: &str,
489 tx: &EventSender,
490) {
491 let mut decoder = hdlc::Decoder::new();
492 let mut buf = [0u8; 4096];
493
494 loop {
495 match stream.read(&mut buf) {
496 Ok(0) => {
497 log::info!("[{}] I2P peer {} disconnected", name, id.0);
498 return;
499 }
500 Ok(n) => {
501 for frame in decoder.feed(&buf[..n]) {
502 if tx
503 .send(Event::Frame {
504 interface_id: id,
505 data: frame,
506 })
507 .is_err()
508 {
509 return; }
511 }
512 }
513 Err(e) => {
514 log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
515 return;
516 }
517 }
518 }
519}
520
521use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
524use std::collections::HashMap;
525
526pub struct I2pFactory;
528
529impl InterfaceFactory for I2pFactory {
530 fn type_name(&self) -> &str {
531 "I2PInterface"
532 }
533
534 fn parse_config(
535 &self,
536 name: &str,
537 id: InterfaceId,
538 params: &HashMap<String, String>,
539 ) -> Result<Box<dyn InterfaceConfigData>, String> {
540 let sam_host = params
541 .get("sam_host")
542 .cloned()
543 .unwrap_or_else(|| "127.0.0.1".into());
544
545 let sam_port = params
546 .get("sam_port")
547 .and_then(|v| v.parse::<u16>().ok())
548 .unwrap_or(7656);
549
550 let connectable = params
551 .get("connectable")
552 .and_then(|v| crate::config::parse_bool_pub(v))
553 .unwrap_or(false);
554
555 let peers = params
556 .get("peers")
557 .map(|v| {
558 v.split(',')
559 .map(|s| s.trim().to_string())
560 .filter(|s| !s.is_empty())
561 .collect::<Vec<String>>()
562 })
563 .unwrap_or_default();
564
565 let storage_dir = params
566 .get("storage_dir")
567 .map(PathBuf::from)
568 .unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
569
570 Ok(Box::new(I2pConfig {
571 name: name.to_string(),
572 interface_id: id,
573 sam_host,
574 sam_port,
575 connectable,
576 peers,
577 storage_dir,
578 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
579 runtime: Arc::new(Mutex::new(I2pRuntime {
580 reconnect_wait: RECONNECT_WAIT,
581 })),
582 }))
583 }
584
585 fn start(
586 &self,
587 config: Box<dyn InterfaceConfigData>,
588 ctx: StartContext,
589 ) -> io::Result<StartResult> {
590 let mut cfg = *config
591 .into_any()
592 .downcast::<I2pConfig>()
593 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
594 cfg.ingress_control = ctx.ingress_control;
595 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
596 Ok(StartResult::Listener { control: None })
597 }
598}
599
600pub(crate) fn i2p_runtime_handle_from_config(config: &I2pConfig) -> I2pRuntimeConfigHandle {
601 I2pRuntimeConfigHandle {
602 interface_name: config.name.clone(),
603 runtime: Arc::clone(&config.runtime),
604 startup: I2pRuntime::from_config(config),
605 }
606}