1pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33const BITRATE_GUESS: u64 = 256_000;
35
36const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39#[derive(Debug, Clone)]
41pub struct I2pConfig {
42 pub name: String,
43 pub interface_id: InterfaceId,
44 pub sam_host: String,
46 pub sam_port: u16,
48 pub peers: Vec<String>,
50 pub connectable: bool,
52 pub storage_dir: PathBuf,
54}
55
56impl Default for I2pConfig {
57 fn default() -> Self {
58 I2pConfig {
59 name: String::new(),
60 interface_id: InterfaceId(0),
61 sam_host: "127.0.0.1".into(),
62 sam_port: 7656,
63 peers: Vec::new(),
64 connectable: false,
65 storage_dir: PathBuf::from("."),
66 }
67 }
68}
69
70struct I2pWriter {
72 stream: std::net::TcpStream,
73}
74
75impl Writer for I2pWriter {
76 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
77 self.stream.write_all(&hdlc::frame(data))
78 }
79}
80
81fn sanitize_name(name: &str) -> String {
83 name.chars()
84 .map(|c| {
85 if c.is_alphanumeric() || c == '-' || c == '_' {
86 c
87 } else {
88 '_'
89 }
90 })
91 .collect()
92}
93
94fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
96 storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
97}
98
99fn load_or_generate_keypair(
102 sam_addr: &SocketAddr,
103 storage_dir: &PathBuf,
104 name: &str,
105) -> Result<sam::KeyPair, SamError> {
106 let key_path = key_file_path(storage_dir, name);
107
108 if key_path.exists() {
109 let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
111
112 Ok(sam::KeyPair {
131 destination: Destination { data: Vec::new() }, private_key: priv_data,
133 })
134 } else {
135 log::info!("[{}] generating new I2P destination keypair", name);
137 let keypair = sam::dest_generate(sam_addr)?;
138
139 if let Some(parent) = key_path.parent() {
141 std::fs::create_dir_all(parent).map_err(SamError::Io)?;
142 }
143 std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
144 log::info!("[{}] saved I2P key to {:?}", name, key_path);
145
146 Ok(keypair)
147 }
148}
149
150pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
153 let name = config.name.clone();
154
155 thread::Builder::new()
156 .name(format!("i2p-coord-{}", config.interface_id.0))
157 .spawn(move || {
158 if let Err(e) = coordinator(config, tx, next_id) {
159 log::error!("[{}] I2P coordinator failed: {}", name, e);
160 }
161 })?;
162
163 Ok(())
164}
165
166fn coordinator(
168 config: I2pConfig,
169 tx: EventSender,
170 next_id: Arc<AtomicU64>,
171) -> Result<(), SamError> {
172 let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
173 .parse()
174 .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
175
176 let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
178 let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
179
180 let session_id = sanitize_name(&config.name);
183
184 log::info!("[{}] creating SAM session (id={})", config.name, session_id);
185 let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
186
187 match sam::naming_lookup_on(&mut control_socket, "ME") {
190 Ok(our_dest) => {
191 let b32 = our_dest.base32_address();
192 log::info!("[{}] I2P address: {}", config.name, b32);
193 }
194 Err(e) => {
195 log::warn!("[{}] could not look up own destination: {}", config.name, e);
196 }
197 }
198
199 for peer_addr in &config.peers {
201 let peer_addr = peer_addr.trim().to_string();
202 if peer_addr.is_empty() {
203 continue;
204 }
205
206 let tx2 = tx.clone();
207 let next_id2 = next_id.clone();
208 let sam_addr2 = sam_addr;
209 let session_id2 = session_id.clone();
210 let iface_name = config.name.clone();
211
212 thread::Builder::new()
213 .name(format!("i2p-out-{}", peer_addr))
214 .spawn(move || {
215 outbound_peer_loop(
216 sam_addr2,
217 &session_id2,
218 &peer_addr,
219 &iface_name,
220 tx2,
221 next_id2,
222 );
223 })
224 .ok();
225 }
226
227 if config.connectable {
229 let tx2 = tx.clone();
230 let next_id2 = next_id.clone();
231 let sam_addr2 = sam_addr;
232 let session_id2 = session_id.clone();
233 let iface_name = config.name.clone();
234
235 thread::Builder::new()
236 .name("i2p-acceptor".into())
237 .spawn(move || {
238 acceptor_loop(sam_addr2, &session_id2, &iface_name, tx2, next_id2);
239 })
240 .ok();
241 }
242
243 let _keep_alive = control_socket;
246 loop {
247 thread::sleep(Duration::from_secs(3600));
248 }
249}
250
251fn outbound_peer_loop(
254 sam_addr: SocketAddr,
255 session_id: &str,
256 peer_addr: &str,
257 iface_name: &str,
258 tx: EventSender,
259 next_id: Arc<AtomicU64>,
260) {
261 loop {
262 log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
263
264 let destination = if peer_addr.ends_with(".i2p") {
266 match sam::naming_lookup(&sam_addr, peer_addr) {
267 Ok(dest) => dest.to_i2p_base64(),
268 Err(e) => {
269 log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
270 thread::sleep(RECONNECT_WAIT);
271 continue;
272 }
273 }
274 } else {
275 peer_addr.to_string()
277 };
278
279 match sam::stream_connect(&sam_addr, session_id, &destination) {
281 Ok(stream) => {
282 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
283
284 log::info!(
285 "[{}] connected to I2P peer {} → id {}",
286 iface_name,
287 peer_addr,
288 client_id.0
289 );
290
291 let writer_stream = match stream.try_clone() {
293 Ok(s) => s,
294 Err(e) => {
295 log::warn!("[{}] failed to clone stream: {}", iface_name, e);
296 thread::sleep(RECONNECT_WAIT);
297 continue;
298 }
299 };
300
301 let writer: Box<dyn Writer> = Box::new(I2pWriter {
302 stream: writer_stream,
303 });
304
305 let info = InterfaceInfo {
306 id: client_id,
307 name: format!("I2PInterface/{}", peer_addr),
308 mode: constants::MODE_FULL,
309 out_capable: true,
310 in_capable: true,
311 bitrate: Some(BITRATE_GUESS),
312 announce_rate_target: None,
313 announce_rate_grace: 0,
314 announce_rate_penalty: 0.0,
315 announce_cap: constants::ANNOUNCE_CAP,
316 is_local_client: false,
317 wants_tunnel: false,
318 tunnel_id: None,
319 mtu: 65535,
320 ia_freq: 0.0,
321 started: 0.0,
322 ingress_control: true,
323 };
324
325 if tx
327 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
328 .is_err()
329 {
330 return; }
332
333 peer_reader_loop(stream, client_id, iface_name, &tx);
335
336 let _ = tx.send(Event::InterfaceDown(client_id));
338 log::warn!(
339 "[{}] I2P peer {} disconnected, reconnecting in {}s",
340 iface_name,
341 peer_addr,
342 RECONNECT_WAIT.as_secs()
343 );
344 }
345 Err(e) => {
346 log::warn!(
347 "[{}] failed to connect to I2P peer {}: {}",
348 iface_name,
349 peer_addr,
350 e
351 );
352 }
353 }
354
355 thread::sleep(RECONNECT_WAIT);
356 }
357}
358
359fn acceptor_loop(
361 sam_addr: SocketAddr,
362 session_id: &str,
363 iface_name: &str,
364 tx: EventSender,
365 next_id: Arc<AtomicU64>,
366) {
367 loop {
368 match sam::stream_accept(&sam_addr, session_id) {
369 Ok((stream, remote_dest)) => {
370 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
371 let remote_b32 = remote_dest.base32_address();
372
373 log::info!(
374 "[{}] accepted I2P connection from {} → id {}",
375 iface_name,
376 remote_b32,
377 client_id.0
378 );
379
380 let writer_stream = match stream.try_clone() {
382 Ok(s) => s,
383 Err(e) => {
384 log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
385 continue;
386 }
387 };
388
389 let writer: Box<dyn Writer> = Box::new(I2pWriter {
390 stream: writer_stream,
391 });
392
393 let info = InterfaceInfo {
394 id: client_id,
395 name: format!("I2PInterface/{}", remote_b32),
396 mode: constants::MODE_FULL,
397 out_capable: true,
398 in_capable: true,
399 bitrate: Some(BITRATE_GUESS),
400 announce_rate_target: None,
401 announce_rate_grace: 0,
402 announce_rate_penalty: 0.0,
403 announce_cap: constants::ANNOUNCE_CAP,
404 is_local_client: false,
405 wants_tunnel: false,
406 tunnel_id: None,
407 mtu: 65535,
408 ia_freq: 0.0,
409 started: 0.0,
410 ingress_control: true,
411 };
412
413 if tx
415 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
416 .is_err()
417 {
418 return; }
420
421 let client_tx = tx.clone();
423 let client_name = iface_name.to_string();
424 thread::Builder::new()
425 .name(format!("i2p-client-{}", client_id.0))
426 .spawn(move || {
427 peer_reader_loop(stream, client_id, &client_name, &client_tx);
428 let _ = client_tx.send(Event::InterfaceDown(client_id));
429 })
430 .ok();
431 }
432 Err(e) => {
433 log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
434 thread::sleep(Duration::from_secs(1));
435 }
436 }
437 }
438}
439
440fn peer_reader_loop(
443 mut stream: std::net::TcpStream,
444 id: InterfaceId,
445 name: &str,
446 tx: &EventSender,
447) {
448 let mut decoder = hdlc::Decoder::new();
449 let mut buf = [0u8; 4096];
450
451 loop {
452 match stream.read(&mut buf) {
453 Ok(0) => {
454 log::info!("[{}] I2P peer {} disconnected", name, id.0);
455 return;
456 }
457 Ok(n) => {
458 for frame in decoder.feed(&buf[..n]) {
459 if tx
460 .send(Event::Frame {
461 interface_id: id,
462 data: frame,
463 })
464 .is_err()
465 {
466 return; }
468 }
469 }
470 Err(e) => {
471 log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
472 return;
473 }
474 }
475 }
476}
477
478use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
481use std::collections::HashMap;
482
483pub struct I2pFactory;
485
486impl InterfaceFactory for I2pFactory {
487 fn type_name(&self) -> &str {
488 "I2PInterface"
489 }
490
491 fn parse_config(
492 &self,
493 name: &str,
494 id: InterfaceId,
495 params: &HashMap<String, String>,
496 ) -> Result<Box<dyn InterfaceConfigData>, String> {
497 let sam_host = params
498 .get("sam_host")
499 .cloned()
500 .unwrap_or_else(|| "127.0.0.1".into());
501
502 let sam_port = params
503 .get("sam_port")
504 .and_then(|v| v.parse::<u16>().ok())
505 .unwrap_or(7656);
506
507 let connectable = params
508 .get("connectable")
509 .and_then(|v| crate::config::parse_bool_pub(v))
510 .unwrap_or(false);
511
512 let peers = params
513 .get("peers")
514 .map(|v| {
515 v.split(',')
516 .map(|s| s.trim().to_string())
517 .filter(|s| !s.is_empty())
518 .collect::<Vec<String>>()
519 })
520 .unwrap_or_default();
521
522 let storage_dir = params
523 .get("storage_dir")
524 .map(PathBuf::from)
525 .unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
526
527 Ok(Box::new(I2pConfig {
528 name: name.to_string(),
529 interface_id: id,
530 sam_host,
531 sam_port,
532 connectable,
533 peers,
534 storage_dir,
535 }))
536 }
537
538 fn start(
539 &self,
540 config: Box<dyn InterfaceConfigData>,
541 ctx: StartContext,
542 ) -> io::Result<StartResult> {
543 let cfg = *config
544 .into_any()
545 .downcast::<I2pConfig>()
546 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
547 start(cfg, ctx.tx, ctx.next_dynamic_id)?;
548 Ok(StartResult::Listener)
549 }
550}