1pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33const BITRATE_GUESS: u64 = 256_000;
35
36const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39#[derive(Debug, Clone)]
41pub struct I2pConfig {
42 pub name: String,
43 pub interface_id: InterfaceId,
44 pub sam_host: String,
46 pub sam_port: u16,
48 pub peers: Vec<String>,
50 pub connectable: bool,
52 pub storage_dir: PathBuf,
54 pub runtime: Arc<Mutex<I2pRuntime>>,
55}
56
57#[derive(Debug, Clone)]
58pub struct I2pRuntime {
59 pub reconnect_wait: Duration,
60}
61
62impl I2pRuntime {
63 pub fn from_config(_config: &I2pConfig) -> Self {
64 Self {
65 reconnect_wait: RECONNECT_WAIT,
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
71pub struct I2pRuntimeConfigHandle {
72 pub interface_name: String,
73 pub runtime: Arc<Mutex<I2pRuntime>>,
74 pub startup: I2pRuntime,
75}
76
77impl Default for I2pConfig {
78 fn default() -> Self {
79 let mut config = I2pConfig {
80 name: String::new(),
81 interface_id: InterfaceId(0),
82 sam_host: "127.0.0.1".into(),
83 sam_port: 7656,
84 peers: Vec::new(),
85 connectable: false,
86 storage_dir: PathBuf::from("."),
87 runtime: Arc::new(Mutex::new(I2pRuntime {
88 reconnect_wait: RECONNECT_WAIT,
89 })),
90 };
91 let startup = I2pRuntime::from_config(&config);
92 config.runtime = Arc::new(Mutex::new(startup));
93 config
94 }
95}
96
97struct I2pWriter {
99 stream: std::net::TcpStream,
100}
101
102impl Writer for I2pWriter {
103 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
104 self.stream.write_all(&hdlc::frame(data))
105 }
106}
107
108fn sanitize_name(name: &str) -> String {
110 name.chars()
111 .map(|c| {
112 if c.is_alphanumeric() || c == '-' || c == '_' {
113 c
114 } else {
115 '_'
116 }
117 })
118 .collect()
119}
120
121fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
123 storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
124}
125
126fn load_or_generate_keypair(
129 sam_addr: &SocketAddr,
130 storage_dir: &PathBuf,
131 name: &str,
132) -> Result<sam::KeyPair, SamError> {
133 let key_path = key_file_path(storage_dir, name);
134
135 if key_path.exists() {
136 let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
138
139 Ok(sam::KeyPair {
158 destination: Destination { data: Vec::new() }, private_key: priv_data,
160 })
161 } else {
162 log::info!("[{}] generating new I2P destination keypair", name);
164 let keypair = sam::dest_generate(sam_addr)?;
165
166 if let Some(parent) = key_path.parent() {
168 std::fs::create_dir_all(parent).map_err(SamError::Io)?;
169 }
170 std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
171 log::info!("[{}] saved I2P key to {:?}", name, key_path);
172
173 Ok(keypair)
174 }
175}
176
177pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
180 let name = config.name.clone();
181
182 thread::Builder::new()
183 .name(format!("i2p-coord-{}", config.interface_id.0))
184 .spawn(move || {
185 if let Err(e) = coordinator(config, tx, next_id) {
186 log::error!("[{}] I2P coordinator failed: {}", name, e);
187 }
188 })?;
189
190 Ok(())
191}
192
193fn coordinator(
195 config: I2pConfig,
196 tx: EventSender,
197 next_id: Arc<AtomicU64>,
198) -> Result<(), SamError> {
199 let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
200 .parse()
201 .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
202
203 let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
205 let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
206
207 let session_id = sanitize_name(&config.name);
210
211 log::info!("[{}] creating SAM session (id={})", config.name, session_id);
212 let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
213
214 match sam::naming_lookup_on(&mut control_socket, "ME") {
217 Ok(our_dest) => {
218 let b32 = our_dest.base32_address();
219 log::info!("[{}] I2P address: {}", config.name, b32);
220 }
221 Err(e) => {
222 log::warn!("[{}] could not look up own destination: {}", config.name, e);
223 }
224 }
225
226 for peer_addr in &config.peers {
228 let peer_addr = peer_addr.trim().to_string();
229 if peer_addr.is_empty() {
230 continue;
231 }
232
233 let tx2 = tx.clone();
234 let next_id2 = next_id.clone();
235 let sam_addr2 = sam_addr;
236 let session_id2 = session_id.clone();
237 let iface_name = config.name.clone();
238 let runtime = Arc::clone(&config.runtime);
239
240 thread::Builder::new()
241 .name(format!("i2p-out-{}", peer_addr))
242 .spawn(move || {
243 outbound_peer_loop(
244 sam_addr2,
245 &session_id2,
246 &peer_addr,
247 &iface_name,
248 tx2,
249 next_id2,
250 runtime,
251 );
252 })
253 .ok();
254 }
255
256 if config.connectable {
258 let tx2 = tx.clone();
259 let next_id2 = next_id.clone();
260 let sam_addr2 = sam_addr;
261 let session_id2 = session_id.clone();
262 let iface_name = config.name.clone();
263
264 thread::Builder::new()
265 .name("i2p-acceptor".into())
266 .spawn(move || {
267 acceptor_loop(sam_addr2, &session_id2, &iface_name, tx2, next_id2);
268 })
269 .ok();
270 }
271
272 let _keep_alive = control_socket;
275 loop {
276 thread::sleep(Duration::from_secs(3600));
277 }
278}
279
280fn outbound_peer_loop(
283 sam_addr: SocketAddr,
284 session_id: &str,
285 peer_addr: &str,
286 iface_name: &str,
287 tx: EventSender,
288 next_id: Arc<AtomicU64>,
289 runtime: Arc<Mutex<I2pRuntime>>,
290) {
291 loop {
292 log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
293
294 let destination = if peer_addr.ends_with(".i2p") {
296 match sam::naming_lookup(&sam_addr, peer_addr) {
297 Ok(dest) => dest.to_i2p_base64(),
298 Err(e) => {
299 log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
300 thread::sleep(runtime.lock().unwrap().reconnect_wait);
301 continue;
302 }
303 }
304 } else {
305 peer_addr.to_string()
307 };
308
309 match sam::stream_connect(&sam_addr, session_id, &destination) {
311 Ok(stream) => {
312 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
313
314 log::info!(
315 "[{}] connected to I2P peer {} → id {}",
316 iface_name,
317 peer_addr,
318 client_id.0
319 );
320
321 let writer_stream = match stream.try_clone() {
323 Ok(s) => s,
324 Err(e) => {
325 log::warn!("[{}] failed to clone stream: {}", iface_name, e);
326 thread::sleep(runtime.lock().unwrap().reconnect_wait);
327 continue;
328 }
329 };
330
331 let writer: Box<dyn Writer> = Box::new(I2pWriter {
332 stream: writer_stream,
333 });
334
335 let info = InterfaceInfo {
336 id: client_id,
337 name: format!("I2PInterface/{}", peer_addr),
338 mode: constants::MODE_FULL,
339 out_capable: true,
340 in_capable: true,
341 bitrate: Some(BITRATE_GUESS),
342 announce_rate_target: None,
343 announce_rate_grace: 0,
344 announce_rate_penalty: 0.0,
345 announce_cap: constants::ANNOUNCE_CAP,
346 is_local_client: false,
347 wants_tunnel: false,
348 tunnel_id: None,
349 mtu: 65535,
350 ia_freq: 0.0,
351 started: 0.0,
352 ingress_control: true,
353 };
354
355 if tx
357 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
358 .is_err()
359 {
360 return; }
362
363 peer_reader_loop(stream, client_id, iface_name, &tx);
365
366 let _ = tx.send(Event::InterfaceDown(client_id));
368 log::warn!(
369 "[{}] I2P peer {} disconnected, reconnecting in {}s",
370 iface_name,
371 peer_addr,
372 runtime.lock().unwrap().reconnect_wait.as_secs()
373 );
374 }
375 Err(e) => {
376 log::warn!(
377 "[{}] failed to connect to I2P peer {}: {}",
378 iface_name,
379 peer_addr,
380 e
381 );
382 }
383 }
384
385 thread::sleep(runtime.lock().unwrap().reconnect_wait);
386 }
387}
388
389fn acceptor_loop(
391 sam_addr: SocketAddr,
392 session_id: &str,
393 iface_name: &str,
394 tx: EventSender,
395 next_id: Arc<AtomicU64>,
396) {
397 loop {
398 match sam::stream_accept(&sam_addr, session_id) {
399 Ok((stream, remote_dest)) => {
400 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
401 let remote_b32 = remote_dest.base32_address();
402
403 log::info!(
404 "[{}] accepted I2P connection from {} → id {}",
405 iface_name,
406 remote_b32,
407 client_id.0
408 );
409
410 let writer_stream = match stream.try_clone() {
412 Ok(s) => s,
413 Err(e) => {
414 log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
415 continue;
416 }
417 };
418
419 let writer: Box<dyn Writer> = Box::new(I2pWriter {
420 stream: writer_stream,
421 });
422
423 let info = InterfaceInfo {
424 id: client_id,
425 name: format!("I2PInterface/{}", remote_b32),
426 mode: constants::MODE_FULL,
427 out_capable: true,
428 in_capable: true,
429 bitrate: Some(BITRATE_GUESS),
430 announce_rate_target: None,
431 announce_rate_grace: 0,
432 announce_rate_penalty: 0.0,
433 announce_cap: constants::ANNOUNCE_CAP,
434 is_local_client: false,
435 wants_tunnel: false,
436 tunnel_id: None,
437 mtu: 65535,
438 ia_freq: 0.0,
439 started: 0.0,
440 ingress_control: true,
441 };
442
443 if tx
445 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
446 .is_err()
447 {
448 return; }
450
451 let client_tx = tx.clone();
453 let client_name = iface_name.to_string();
454 thread::Builder::new()
455 .name(format!("i2p-client-{}", client_id.0))
456 .spawn(move || {
457 peer_reader_loop(stream, client_id, &client_name, &client_tx);
458 let _ = client_tx.send(Event::InterfaceDown(client_id));
459 })
460 .ok();
461 }
462 Err(e) => {
463 log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
464 thread::sleep(Duration::from_secs(1));
465 }
466 }
467 }
468}
469
470fn peer_reader_loop(
473 mut stream: std::net::TcpStream,
474 id: InterfaceId,
475 name: &str,
476 tx: &EventSender,
477) {
478 let mut decoder = hdlc::Decoder::new();
479 let mut buf = [0u8; 4096];
480
481 loop {
482 match stream.read(&mut buf) {
483 Ok(0) => {
484 log::info!("[{}] I2P peer {} disconnected", name, id.0);
485 return;
486 }
487 Ok(n) => {
488 for frame in decoder.feed(&buf[..n]) {
489 if tx
490 .send(Event::Frame {
491 interface_id: id,
492 data: frame,
493 })
494 .is_err()
495 {
496 return; }
498 }
499 }
500 Err(e) => {
501 log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
502 return;
503 }
504 }
505 }
506}
507
508use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
511use std::collections::HashMap;
512
513pub struct I2pFactory;
515
516impl InterfaceFactory for I2pFactory {
517 fn type_name(&self) -> &str {
518 "I2PInterface"
519 }
520
521 fn parse_config(
522 &self,
523 name: &str,
524 id: InterfaceId,
525 params: &HashMap<String, String>,
526 ) -> Result<Box<dyn InterfaceConfigData>, String> {
527 let sam_host = params
528 .get("sam_host")
529 .cloned()
530 .unwrap_or_else(|| "127.0.0.1".into());
531
532 let sam_port = params
533 .get("sam_port")
534 .and_then(|v| v.parse::<u16>().ok())
535 .unwrap_or(7656);
536
537 let connectable = params
538 .get("connectable")
539 .and_then(|v| crate::config::parse_bool_pub(v))
540 .unwrap_or(false);
541
542 let peers = params
543 .get("peers")
544 .map(|v| {
545 v.split(',')
546 .map(|s| s.trim().to_string())
547 .filter(|s| !s.is_empty())
548 .collect::<Vec<String>>()
549 })
550 .unwrap_or_default();
551
552 let storage_dir = params
553 .get("storage_dir")
554 .map(PathBuf::from)
555 .unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
556
557 Ok(Box::new(I2pConfig {
558 name: name.to_string(),
559 interface_id: id,
560 sam_host,
561 sam_port,
562 connectable,
563 peers,
564 storage_dir,
565 runtime: Arc::new(Mutex::new(I2pRuntime {
566 reconnect_wait: RECONNECT_WAIT,
567 })),
568 }))
569 }
570
571 fn start(
572 &self,
573 config: Box<dyn InterfaceConfigData>,
574 ctx: StartContext,
575 ) -> io::Result<StartResult> {
576 let cfg = *config
577 .into_any()
578 .downcast::<I2pConfig>()
579 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
580 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
581 Ok(StartResult::Listener { control: None })
582 }
583}
584
585pub(crate) fn i2p_runtime_handle_from_config(config: &I2pConfig) -> I2pRuntimeConfigHandle {
586 I2pRuntimeConfigHandle {
587 interface_name: config.name.clone(),
588 runtime: Arc::clone(&config.runtime),
589 startup: I2pRuntime::from_config(config),
590 }
591}