Skip to main content

rns_net/interface/i2p/
mod.rs

1//! I2P interface using the SAM v3.1 protocol.
2//!
3//! Provides anonymous transport over the I2P network. The interface manages
4//! both outbound peer connections and inbound connection acceptance.
5//! Each peer connection becomes a dynamic interface with HDLC framing,
6//! similar to TCP server client connections.
7//!
8//! Matches Python `I2PInterface` from `I2PInterface.py`.
9
10pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29/// Hardware MTU for I2P streams (matches Python I2PInterface).
30#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33/// Estimated bitrate for I2P tunnels (256 kbps, matches Python).
34const BITRATE_GUESS: u64 = 256_000;
35
36/// Wait time before reconnecting to an outbound peer.
37const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39/// Configuration for the I2P interface.
40#[derive(Debug, Clone)]
41pub struct I2pConfig {
42    pub name: String,
43    pub interface_id: InterfaceId,
44    /// SAM bridge host (default "127.0.0.1").
45    pub sam_host: String,
46    /// SAM bridge port (default 7656).
47    pub sam_port: u16,
48    /// List of .b32.i2p peer addresses (or full base64 destinations) to connect to.
49    pub peers: Vec<String>,
50    /// Whether to accept inbound connections.
51    pub connectable: bool,
52    /// Directory for key persistence (typically `{config_dir}/storage`).
53    pub storage_dir: PathBuf,
54    pub runtime: Arc<Mutex<I2pRuntime>>,
55}
56
57#[derive(Debug, Clone)]
58pub struct I2pRuntime {
59    pub reconnect_wait: Duration,
60}
61
62impl I2pRuntime {
63    pub fn from_config(_config: &I2pConfig) -> Self {
64        Self {
65            reconnect_wait: RECONNECT_WAIT,
66        }
67    }
68}
69
70#[derive(Debug, Clone)]
71pub struct I2pRuntimeConfigHandle {
72    pub interface_name: String,
73    pub runtime: Arc<Mutex<I2pRuntime>>,
74    pub startup: I2pRuntime,
75}
76
77impl Default for I2pConfig {
78    fn default() -> Self {
79        let mut config = I2pConfig {
80            name: String::new(),
81            interface_id: InterfaceId(0),
82            sam_host: "127.0.0.1".into(),
83            sam_port: 7656,
84            peers: Vec::new(),
85            connectable: false,
86            storage_dir: PathBuf::from("."),
87            runtime: Arc::new(Mutex::new(I2pRuntime {
88                reconnect_wait: RECONNECT_WAIT,
89            })),
90        };
91        let startup = I2pRuntime::from_config(&config);
92        config.runtime = Arc::new(Mutex::new(startup));
93        config
94    }
95}
96
97/// Writer that sends HDLC-framed data over an I2P SAM stream.
98struct I2pWriter {
99    stream: std::net::TcpStream,
100}
101
102impl Writer for I2pWriter {
103    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
104        self.stream.write_all(&hdlc::frame(data))
105    }
106}
107
108/// Sanitize a name for use in filenames and SAM session IDs.
109fn sanitize_name(name: &str) -> String {
110    name.chars()
111        .map(|c| {
112            if c.is_alphanumeric() || c == '-' || c == '_' {
113                c
114            } else {
115                '_'
116            }
117        })
118        .collect()
119}
120
121/// Key file path for the given interface name.
122fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
123    storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
124}
125
126/// Load or generate an I2P keypair. Persists the private key to disk
127/// so the node keeps a stable I2P address across restarts.
128fn load_or_generate_keypair(
129    sam_addr: &SocketAddr,
130    storage_dir: &PathBuf,
131    name: &str,
132) -> Result<sam::KeyPair, SamError> {
133    let key_path = key_file_path(storage_dir, name);
134
135    if key_path.exists() {
136        // Load existing private key
137        let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
138
139        // We need the public destination too. Create a temporary session to extract it,
140        // or we can derive it by creating a session and reading back the destination.
141        // Simpler: store the full keypair. Let's re-generate from the private key
142        // by creating a session (the destination is embedded in the private key).
143        // Actually, the I2P private key blob contains the destination as a prefix.
144        // We can extract the destination from the private key.
145        // The destination is the first portion of the private key blob.
146        // For simplicity, store the full PRIV blob (which includes the destination).
147
148        // The PRIV blob from DEST GENERATE contains both the destination and private keys.
149        // The destination is extractable as the first ~387 bytes, but the exact size
150        // depends on the key type. For Ed25519 (sig type 7), the destination is:
151        //   256 bytes (encryption public key) + 128 bytes (signing public key) + 3 bytes (certificate)
152        // = 387 bytes. But this can vary with certificate content.
153
154        // Rather than parsing, we pass the full PRIV to SESSION CREATE (which accepts it)
155        // and then do a NAMING LOOKUP on "ME" to get our destination.
156
157        Ok(sam::KeyPair {
158            destination: Destination { data: Vec::new() }, // filled in later
159            private_key: priv_data,
160        })
161    } else {
162        // Generate new keypair
163        log::info!("[{}] generating new I2P destination keypair", name);
164        let keypair = sam::dest_generate(sam_addr)?;
165
166        // Save private key to disk
167        if let Some(parent) = key_path.parent() {
168            std::fs::create_dir_all(parent).map_err(SamError::Io)?;
169        }
170        std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
171        log::info!("[{}] saved I2P key to {:?}", name, key_path);
172
173        Ok(keypair)
174    }
175}
176
177/// Start the I2P interface coordinator. All peer connections are registered
178/// as dynamic interfaces via InterfaceUp events.
179pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
180    let name = config.name.clone();
181
182    thread::Builder::new()
183        .name(format!("i2p-coord-{}", config.interface_id.0))
184        .spawn(move || {
185            if let Err(e) = coordinator(config, tx, next_id) {
186                log::error!("[{}] I2P coordinator failed: {}", name, e);
187            }
188        })?;
189
190    Ok(())
191}
192
193/// Coordinator thread: sets up SAM sessions and spawns peer threads.
194fn coordinator(
195    config: I2pConfig,
196    tx: EventSender,
197    next_id: Arc<AtomicU64>,
198) -> Result<(), SamError> {
199    let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
200        .parse()
201        .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
202
203    // Load or generate keypair
204    let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
205    let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
206
207    // We use a single session for all streams (outbound + inbound).
208    // Session ID must be unique; use the interface name sanitized.
209    let session_id = sanitize_name(&config.name);
210
211    log::info!("[{}] creating SAM session (id={})", config.name, session_id);
212    let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
213
214    // Look up our own destination via NAMING LOOKUP "ME" on the session control socket.
215    // "ME" requires a session context on the same connection.
216    match sam::naming_lookup_on(&mut control_socket, "ME") {
217        Ok(our_dest) => {
218            let b32 = our_dest.base32_address();
219            log::info!("[{}] I2P address: {}", config.name, b32);
220        }
221        Err(e) => {
222            log::warn!("[{}] could not look up own destination: {}", config.name, e);
223        }
224    }
225
226    // Spawn outbound peer threads
227    for peer_addr in &config.peers {
228        let peer_addr = peer_addr.trim().to_string();
229        if peer_addr.is_empty() {
230            continue;
231        }
232
233        let tx2 = tx.clone();
234        let next_id2 = next_id.clone();
235        let sam_addr2 = sam_addr;
236        let session_id2 = session_id.clone();
237        let iface_name = config.name.clone();
238        let runtime = Arc::clone(&config.runtime);
239
240        thread::Builder::new()
241            .name(format!("i2p-out-{}", peer_addr))
242            .spawn(move || {
243                outbound_peer_loop(
244                    sam_addr2,
245                    &session_id2,
246                    &peer_addr,
247                    &iface_name,
248                    tx2,
249                    next_id2,
250                    runtime,
251                );
252            })
253            .ok();
254    }
255
256    // Spawn acceptor thread if connectable
257    if config.connectable {
258        let tx2 = tx.clone();
259        let next_id2 = next_id.clone();
260        let sam_addr2 = sam_addr;
261        let session_id2 = session_id.clone();
262        let iface_name = config.name.clone();
263
264        thread::Builder::new()
265            .name("i2p-acceptor".into())
266            .spawn(move || {
267                acceptor_loop(sam_addr2, &session_id2, &iface_name, tx2, next_id2);
268            })
269            .ok();
270    }
271
272    // The control socket (and this thread) must remain alive for the session's lifetime.
273    // We park the coordinator thread here. If control_socket drops, the session ends.
274    let _keep_alive = control_socket;
275    loop {
276        thread::sleep(Duration::from_secs(3600));
277    }
278}
279
280/// Outbound peer thread: connects to a remote I2P destination, runs HDLC reader loop.
281/// Reconnects on failure.
282fn outbound_peer_loop(
283    sam_addr: SocketAddr,
284    session_id: &str,
285    peer_addr: &str,
286    iface_name: &str,
287    tx: EventSender,
288    next_id: Arc<AtomicU64>,
289    runtime: Arc<Mutex<I2pRuntime>>,
290) {
291    loop {
292        log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
293
294        // Resolve .b32.i2p address if needed
295        let destination = if peer_addr.ends_with(".i2p") {
296            match sam::naming_lookup(&sam_addr, peer_addr) {
297                Ok(dest) => dest.to_i2p_base64(),
298                Err(e) => {
299                    log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
300                    thread::sleep(runtime.lock().unwrap().reconnect_wait);
301                    continue;
302                }
303            }
304        } else {
305            // Assume it's already a full base64 destination
306            peer_addr.to_string()
307        };
308
309        // Connect via SAM
310        match sam::stream_connect(&sam_addr, session_id, &destination) {
311            Ok(stream) => {
312                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
313
314                log::info!(
315                    "[{}] connected to I2P peer {} → id {}",
316                    iface_name,
317                    peer_addr,
318                    client_id.0
319                );
320
321                // Clone stream for writer/reader split
322                let writer_stream = match stream.try_clone() {
323                    Ok(s) => s,
324                    Err(e) => {
325                        log::warn!("[{}] failed to clone stream: {}", iface_name, e);
326                        thread::sleep(runtime.lock().unwrap().reconnect_wait);
327                        continue;
328                    }
329                };
330
331                let writer: Box<dyn Writer> = Box::new(I2pWriter {
332                    stream: writer_stream,
333                });
334
335                let info = InterfaceInfo {
336                    id: client_id,
337                    name: format!("I2PInterface/{}", peer_addr),
338                    mode: constants::MODE_FULL,
339                    out_capable: true,
340                    in_capable: true,
341                    bitrate: Some(BITRATE_GUESS),
342                    announce_rate_target: None,
343                    announce_rate_grace: 0,
344                    announce_rate_penalty: 0.0,
345                    announce_cap: constants::ANNOUNCE_CAP,
346                    is_local_client: false,
347                    wants_tunnel: false,
348                    tunnel_id: None,
349                    mtu: 65535,
350                    ia_freq: 0.0,
351                    started: 0.0,
352                    ingress_control: true,
353                };
354
355                // Register dynamic interface
356                if tx
357                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
358                    .is_err()
359                {
360                    return; // Driver shut down
361                }
362
363                // Run HDLC reader loop (blocks until disconnect)
364                peer_reader_loop(stream, client_id, iface_name, &tx);
365
366                // Disconnected
367                let _ = tx.send(Event::InterfaceDown(client_id));
368                log::warn!(
369                    "[{}] I2P peer {} disconnected, reconnecting in {}s",
370                    iface_name,
371                    peer_addr,
372                    runtime.lock().unwrap().reconnect_wait.as_secs()
373                );
374            }
375            Err(e) => {
376                log::warn!(
377                    "[{}] failed to connect to I2P peer {}: {}",
378                    iface_name,
379                    peer_addr,
380                    e
381                );
382            }
383        }
384
385        thread::sleep(runtime.lock().unwrap().reconnect_wait);
386    }
387}
388
389/// Acceptor thread: loops accepting inbound connections on the session.
390fn acceptor_loop(
391    sam_addr: SocketAddr,
392    session_id: &str,
393    iface_name: &str,
394    tx: EventSender,
395    next_id: Arc<AtomicU64>,
396) {
397    loop {
398        match sam::stream_accept(&sam_addr, session_id) {
399            Ok((stream, remote_dest)) => {
400                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
401                let remote_b32 = remote_dest.base32_address();
402
403                log::info!(
404                    "[{}] accepted I2P connection from {} → id {}",
405                    iface_name,
406                    remote_b32,
407                    client_id.0
408                );
409
410                // Clone stream for writer/reader split
411                let writer_stream = match stream.try_clone() {
412                    Ok(s) => s,
413                    Err(e) => {
414                        log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
415                        continue;
416                    }
417                };
418
419                let writer: Box<dyn Writer> = Box::new(I2pWriter {
420                    stream: writer_stream,
421                });
422
423                let info = InterfaceInfo {
424                    id: client_id,
425                    name: format!("I2PInterface/{}", remote_b32),
426                    mode: constants::MODE_FULL,
427                    out_capable: true,
428                    in_capable: true,
429                    bitrate: Some(BITRATE_GUESS),
430                    announce_rate_target: None,
431                    announce_rate_grace: 0,
432                    announce_rate_penalty: 0.0,
433                    announce_cap: constants::ANNOUNCE_CAP,
434                    is_local_client: false,
435                    wants_tunnel: false,
436                    tunnel_id: None,
437                    mtu: 65535,
438                    ia_freq: 0.0,
439                    started: 0.0,
440                    ingress_control: true,
441                };
442
443                // Register dynamic interface
444                if tx
445                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
446                    .is_err()
447                {
448                    return; // Driver shut down
449                }
450
451                // Spawn per-client reader thread (accepted peers don't reconnect)
452                let client_tx = tx.clone();
453                let client_name = iface_name.to_string();
454                thread::Builder::new()
455                    .name(format!("i2p-client-{}", client_id.0))
456                    .spawn(move || {
457                        peer_reader_loop(stream, client_id, &client_name, &client_tx);
458                        let _ = client_tx.send(Event::InterfaceDown(client_id));
459                    })
460                    .ok();
461            }
462            Err(e) => {
463                log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
464                thread::sleep(Duration::from_secs(1));
465            }
466        }
467    }
468}
469
470/// Per-peer HDLC reader loop. Reads from the SAM data stream, decodes HDLC
471/// frames, and sends them to the driver. Returns when the stream is closed.
472fn peer_reader_loop(
473    mut stream: std::net::TcpStream,
474    id: InterfaceId,
475    name: &str,
476    tx: &EventSender,
477) {
478    let mut decoder = hdlc::Decoder::new();
479    let mut buf = [0u8; 4096];
480
481    loop {
482        match stream.read(&mut buf) {
483            Ok(0) => {
484                log::info!("[{}] I2P peer {} disconnected", name, id.0);
485                return;
486            }
487            Ok(n) => {
488                for frame in decoder.feed(&buf[..n]) {
489                    if tx
490                        .send(Event::Frame {
491                            interface_id: id,
492                            data: frame,
493                        })
494                        .is_err()
495                    {
496                        return; // Driver shut down
497                    }
498                }
499            }
500            Err(e) => {
501                log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
502                return;
503            }
504        }
505    }
506}
507
508// --- Factory implementation ---
509
510use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
511use std::collections::HashMap;
512
513/// Factory for `I2PInterface`.
514pub struct I2pFactory;
515
516impl InterfaceFactory for I2pFactory {
517    fn type_name(&self) -> &str {
518        "I2PInterface"
519    }
520
521    fn parse_config(
522        &self,
523        name: &str,
524        id: InterfaceId,
525        params: &HashMap<String, String>,
526    ) -> Result<Box<dyn InterfaceConfigData>, String> {
527        let sam_host = params
528            .get("sam_host")
529            .cloned()
530            .unwrap_or_else(|| "127.0.0.1".into());
531
532        let sam_port = params
533            .get("sam_port")
534            .and_then(|v| v.parse::<u16>().ok())
535            .unwrap_or(7656);
536
537        let connectable = params
538            .get("connectable")
539            .and_then(|v| crate::config::parse_bool_pub(v))
540            .unwrap_or(false);
541
542        let peers = params
543            .get("peers")
544            .map(|v| {
545                v.split(',')
546                    .map(|s| s.trim().to_string())
547                    .filter(|s| !s.is_empty())
548                    .collect::<Vec<String>>()
549            })
550            .unwrap_or_default();
551
552        let storage_dir = params
553            .get("storage_dir")
554            .map(PathBuf::from)
555            .unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
556
557        Ok(Box::new(I2pConfig {
558            name: name.to_string(),
559            interface_id: id,
560            sam_host,
561            sam_port,
562            connectable,
563            peers,
564            storage_dir,
565            runtime: Arc::new(Mutex::new(I2pRuntime {
566                reconnect_wait: RECONNECT_WAIT,
567            })),
568        }))
569    }
570
571    fn start(
572        &self,
573        config: Box<dyn InterfaceConfigData>,
574        ctx: StartContext,
575    ) -> io::Result<StartResult> {
576        let cfg = *config
577            .into_any()
578            .downcast::<I2pConfig>()
579            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
580        start(cfg, ctx.tx, ctx.next_dynamic_id)?;
581        Ok(StartResult::Listener { control: None })
582    }
583}
584
585pub(crate) fn i2p_runtime_handle_from_config(config: &I2pConfig) -> I2pRuntimeConfigHandle {
586    I2pRuntimeConfigHandle {
587        interface_name: config.name.clone(),
588        runtime: Arc::clone(&config.runtime),
589        startup: I2pRuntime::from_config(config),
590    }
591}