1pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33const BITRATE_GUESS: u64 = 256_000;
35
36const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39#[derive(Debug, Clone)]
41pub struct I2pConfig {
42 pub name: String,
43 pub interface_id: InterfaceId,
44 pub sam_host: String,
46 pub sam_port: u16,
48 pub peers: Vec<String>,
50 pub connectable: bool,
52 pub storage_dir: PathBuf,
54}
55
56impl Default for I2pConfig {
57 fn default() -> Self {
58 I2pConfig {
59 name: String::new(),
60 interface_id: InterfaceId(0),
61 sam_host: "127.0.0.1".into(),
62 sam_port: 7656,
63 peers: Vec::new(),
64 connectable: false,
65 storage_dir: PathBuf::from("."),
66 }
67 }
68}
69
70struct I2pWriter {
72 stream: std::net::TcpStream,
73}
74
75impl Writer for I2pWriter {
76 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
77 self.stream.write_all(&hdlc::frame(data))
78 }
79}
80
81fn sanitize_name(name: &str) -> String {
83 name.chars()
84 .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '_' })
85 .collect()
86}
87
88fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
90 storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
91}
92
93fn load_or_generate_keypair(
96 sam_addr: &SocketAddr,
97 storage_dir: &PathBuf,
98 name: &str,
99) -> Result<sam::KeyPair, SamError> {
100 let key_path = key_file_path(storage_dir, name);
101
102 if key_path.exists() {
103 let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
105
106 Ok(sam::KeyPair {
125 destination: Destination { data: Vec::new() }, private_key: priv_data,
127 })
128 } else {
129 log::info!("[{}] generating new I2P destination keypair", name);
131 let keypair = sam::dest_generate(sam_addr)?;
132
133 if let Some(parent) = key_path.parent() {
135 std::fs::create_dir_all(parent).map_err(SamError::Io)?;
136 }
137 std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
138 log::info!("[{}] saved I2P key to {:?}", name, key_path);
139
140 Ok(keypair)
141 }
142}
143
144pub fn start(
147 config: I2pConfig,
148 tx: EventSender,
149 next_id: Arc<AtomicU64>,
150) -> io::Result<()> {
151 let name = config.name.clone();
152
153 thread::Builder::new()
154 .name(format!("i2p-coord-{}", config.interface_id.0))
155 .spawn(move || {
156 if let Err(e) = coordinator(config, tx, next_id) {
157 log::error!("[{}] I2P coordinator failed: {}", name, e);
158 }
159 })?;
160
161 Ok(())
162}
163
164fn coordinator(
166 config: I2pConfig,
167 tx: EventSender,
168 next_id: Arc<AtomicU64>,
169) -> Result<(), SamError> {
170 let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
171 .parse()
172 .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
173
174 let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
176 let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
177
178 let session_id = sanitize_name(&config.name);
181
182 log::info!("[{}] creating SAM session (id={})", config.name, session_id);
183 let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
184
185 match sam::naming_lookup_on(&mut control_socket, "ME") {
188 Ok(our_dest) => {
189 let b32 = our_dest.base32_address();
190 log::info!("[{}] I2P address: {}", config.name, b32);
191 }
192 Err(e) => {
193 log::warn!("[{}] could not look up own destination: {}", config.name, e);
194 }
195 }
196
197 for peer_addr in &config.peers {
199 let peer_addr = peer_addr.trim().to_string();
200 if peer_addr.is_empty() {
201 continue;
202 }
203
204 let tx2 = tx.clone();
205 let next_id2 = next_id.clone();
206 let sam_addr2 = sam_addr;
207 let session_id2 = session_id.clone();
208 let iface_name = config.name.clone();
209
210 thread::Builder::new()
211 .name(format!("i2p-out-{}", peer_addr))
212 .spawn(move || {
213 outbound_peer_loop(
214 sam_addr2,
215 &session_id2,
216 &peer_addr,
217 &iface_name,
218 tx2,
219 next_id2,
220 );
221 })
222 .ok();
223 }
224
225 if config.connectable {
227 let tx2 = tx.clone();
228 let next_id2 = next_id.clone();
229 let sam_addr2 = sam_addr;
230 let session_id2 = session_id.clone();
231 let iface_name = config.name.clone();
232
233 thread::Builder::new()
234 .name("i2p-acceptor".into())
235 .spawn(move || {
236 acceptor_loop(sam_addr2, &session_id2, &iface_name, tx2, next_id2);
237 })
238 .ok();
239 }
240
241 let _keep_alive = control_socket;
244 loop {
245 thread::sleep(Duration::from_secs(3600));
246 }
247}
248
249fn outbound_peer_loop(
252 sam_addr: SocketAddr,
253 session_id: &str,
254 peer_addr: &str,
255 iface_name: &str,
256 tx: EventSender,
257 next_id: Arc<AtomicU64>,
258) {
259 loop {
260 log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
261
262 let destination = if peer_addr.ends_with(".i2p") {
264 match sam::naming_lookup(&sam_addr, peer_addr) {
265 Ok(dest) => dest.to_i2p_base64(),
266 Err(e) => {
267 log::warn!(
268 "[{}] failed to resolve {}: {}",
269 iface_name, peer_addr, e
270 );
271 thread::sleep(RECONNECT_WAIT);
272 continue;
273 }
274 }
275 } else {
276 peer_addr.to_string()
278 };
279
280 match sam::stream_connect(&sam_addr, session_id, &destination) {
282 Ok(stream) => {
283 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
284
285 log::info!(
286 "[{}] connected to I2P peer {} → id {}",
287 iface_name, peer_addr, client_id.0
288 );
289
290 let writer_stream = match stream.try_clone() {
292 Ok(s) => s,
293 Err(e) => {
294 log::warn!("[{}] failed to clone stream: {}", iface_name, e);
295 thread::sleep(RECONNECT_WAIT);
296 continue;
297 }
298 };
299
300 let writer: Box<dyn Writer> = Box::new(I2pWriter { stream: writer_stream });
301
302 let info = InterfaceInfo {
303 id: client_id,
304 name: format!("I2PInterface/{}", peer_addr),
305 mode: constants::MODE_FULL,
306 out_capable: true,
307 in_capable: true,
308 bitrate: Some(BITRATE_GUESS),
309 announce_rate_target: None,
310 announce_rate_grace: 0,
311 announce_rate_penalty: 0.0,
312 announce_cap: constants::ANNOUNCE_CAP,
313 is_local_client: false,
314 wants_tunnel: false,
315 tunnel_id: None,
316 mtu: 65535,
317 ia_freq: 0.0,
318 started: 0.0,
319 ingress_control: true,
320 };
321
322 if tx
324 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
325 .is_err()
326 {
327 return; }
329
330 peer_reader_loop(stream, client_id, iface_name, &tx);
332
333 let _ = tx.send(Event::InterfaceDown(client_id));
335 log::warn!(
336 "[{}] I2P peer {} disconnected, reconnecting in {}s",
337 iface_name,
338 peer_addr,
339 RECONNECT_WAIT.as_secs()
340 );
341 }
342 Err(e) => {
343 log::warn!(
344 "[{}] failed to connect to I2P peer {}: {}",
345 iface_name, peer_addr, e
346 );
347 }
348 }
349
350 thread::sleep(RECONNECT_WAIT);
351 }
352}
353
354fn acceptor_loop(
356 sam_addr: SocketAddr,
357 session_id: &str,
358 iface_name: &str,
359 tx: EventSender,
360 next_id: Arc<AtomicU64>,
361) {
362 loop {
363 match sam::stream_accept(&sam_addr, session_id) {
364 Ok((stream, remote_dest)) => {
365 let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
366 let remote_b32 = remote_dest.base32_address();
367
368 log::info!(
369 "[{}] accepted I2P connection from {} → id {}",
370 iface_name, remote_b32, client_id.0
371 );
372
373 let writer_stream = match stream.try_clone() {
375 Ok(s) => s,
376 Err(e) => {
377 log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
378 continue;
379 }
380 };
381
382 let writer: Box<dyn Writer> = Box::new(I2pWriter { stream: writer_stream });
383
384 let info = InterfaceInfo {
385 id: client_id,
386 name: format!("I2PInterface/{}", remote_b32),
387 mode: constants::MODE_FULL,
388 out_capable: true,
389 in_capable: true,
390 bitrate: Some(BITRATE_GUESS),
391 announce_rate_target: None,
392 announce_rate_grace: 0,
393 announce_rate_penalty: 0.0,
394 announce_cap: constants::ANNOUNCE_CAP,
395 is_local_client: false,
396 wants_tunnel: false,
397 tunnel_id: None,
398 mtu: 65535,
399 ia_freq: 0.0,
400 started: 0.0,
401 ingress_control: true,
402 };
403
404 if tx
406 .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
407 .is_err()
408 {
409 return; }
411
412 let client_tx = tx.clone();
414 let client_name = iface_name.to_string();
415 thread::Builder::new()
416 .name(format!("i2p-client-{}", client_id.0))
417 .spawn(move || {
418 peer_reader_loop(stream, client_id, &client_name, &client_tx);
419 let _ = client_tx.send(Event::InterfaceDown(client_id));
420 })
421 .ok();
422 }
423 Err(e) => {
424 log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
425 thread::sleep(Duration::from_secs(1));
426 }
427 }
428 }
429}
430
431fn peer_reader_loop(
434 mut stream: std::net::TcpStream,
435 id: InterfaceId,
436 name: &str,
437 tx: &EventSender,
438) {
439 let mut decoder = hdlc::Decoder::new();
440 let mut buf = [0u8; 4096];
441
442 loop {
443 match stream.read(&mut buf) {
444 Ok(0) => {
445 log::info!("[{}] I2P peer {} disconnected", name, id.0);
446 return;
447 }
448 Ok(n) => {
449 for frame in decoder.feed(&buf[..n]) {
450 if tx
451 .send(Event::Frame {
452 interface_id: id,
453 data: frame,
454 })
455 .is_err()
456 {
457 return; }
459 }
460 }
461 Err(e) => {
462 log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
463 return;
464 }
465 }
466 }
467}