Skip to main content

rns_net/interface/i2p/
mod.rs

1//! I2P interface using the SAM v3.1 protocol.
2//!
3//! Provides anonymous transport over the I2P network. The interface manages
4//! both outbound peer connections and inbound connection acceptance.
5//! Each peer connection becomes a dynamic interface with HDLC framing,
6//! similar to TCP server client connections.
7//!
8//! Matches Python `I2PInterface` from `I2PInterface.py`.
9
10pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29/// Hardware MTU for I2P streams (matches Python I2PInterface).
30#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33/// Estimated bitrate for I2P tunnels (256 kbps, matches Python).
34const BITRATE_GUESS: u64 = 256_000;
35
36/// Wait time before reconnecting to an outbound peer.
37const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39/// Configuration for the I2P interface.
40#[derive(Debug, Clone)]
41pub struct I2pConfig {
42    pub name: String,
43    pub interface_id: InterfaceId,
44    /// SAM bridge host (default "127.0.0.1").
45    pub sam_host: String,
46    /// SAM bridge port (default 7656).
47    pub sam_port: u16,
48    /// List of .b32.i2p peer addresses (or full base64 destinations) to connect to.
49    pub peers: Vec<String>,
50    /// Whether to accept inbound connections.
51    pub connectable: bool,
52    /// Directory for key persistence (typically `{config_dir}/storage`).
53    pub storage_dir: PathBuf,
54    pub ingress_control: rns_core::transport::types::IngressControlConfig,
55    pub runtime: Arc<Mutex<I2pRuntime>>,
56}
57
58#[derive(Debug, Clone)]
59pub struct I2pRuntime {
60    pub reconnect_wait: Duration,
61}
62
63impl I2pRuntime {
64    pub fn from_config(_config: &I2pConfig) -> Self {
65        Self {
66            reconnect_wait: RECONNECT_WAIT,
67        }
68    }
69}
70
71#[derive(Debug, Clone)]
72pub struct I2pRuntimeConfigHandle {
73    pub interface_name: String,
74    pub runtime: Arc<Mutex<I2pRuntime>>,
75    pub startup: I2pRuntime,
76}
77
78impl Default for I2pConfig {
79    fn default() -> Self {
80        let mut config = I2pConfig {
81            name: String::new(),
82            interface_id: InterfaceId(0),
83            sam_host: "127.0.0.1".into(),
84            sam_port: 7656,
85            peers: Vec::new(),
86            connectable: false,
87            storage_dir: PathBuf::from("."),
88            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
89            runtime: Arc::new(Mutex::new(I2pRuntime {
90                reconnect_wait: RECONNECT_WAIT,
91            })),
92        };
93        let startup = I2pRuntime::from_config(&config);
94        config.runtime = Arc::new(Mutex::new(startup));
95        config
96    }
97}
98
99/// Writer that sends HDLC-framed data over an I2P SAM stream.
100struct I2pWriter {
101    stream: std::net::TcpStream,
102}
103
104impl Writer for I2pWriter {
105    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
106        self.stream.write_all(&hdlc::frame(data))
107    }
108}
109
110/// Sanitize a name for use in filenames and SAM session IDs.
111fn sanitize_name(name: &str) -> String {
112    name.chars()
113        .map(|c| {
114            if c.is_alphanumeric() || c == '-' || c == '_' {
115                c
116            } else {
117                '_'
118            }
119        })
120        .collect()
121}
122
123/// Key file path for the given interface name.
124fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
125    storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
126}
127
128/// Load or generate an I2P keypair. Persists the private key to disk
129/// so the node keeps a stable I2P address across restarts.
130fn load_or_generate_keypair(
131    sam_addr: &SocketAddr,
132    storage_dir: &PathBuf,
133    name: &str,
134) -> Result<sam::KeyPair, SamError> {
135    let key_path = key_file_path(storage_dir, name);
136
137    if key_path.exists() {
138        // Load existing private key
139        let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
140
141        // We need the public destination too. Create a temporary session to extract it,
142        // or we can derive it by creating a session and reading back the destination.
143        // Simpler: store the full keypair. Let's re-generate from the private key
144        // by creating a session (the destination is embedded in the private key).
145        // Actually, the I2P private key blob contains the destination as a prefix.
146        // We can extract the destination from the private key.
147        // The destination is the first portion of the private key blob.
148        // For simplicity, store the full PRIV blob (which includes the destination).
149
150        // The PRIV blob from DEST GENERATE contains both the destination and private keys.
151        // The destination is extractable as the first ~387 bytes, but the exact size
152        // depends on the key type. For Ed25519 (sig type 7), the destination is:
153        //   256 bytes (encryption public key) + 128 bytes (signing public key) + 3 bytes (certificate)
154        // = 387 bytes. But this can vary with certificate content.
155
156        // Rather than parsing, we pass the full PRIV to SESSION CREATE (which accepts it)
157        // and then do a NAMING LOOKUP on "ME" to get our destination.
158
159        Ok(sam::KeyPair {
160            destination: Destination { data: Vec::new() }, // filled in later
161            private_key: priv_data,
162        })
163    } else {
164        // Generate new keypair
165        log::info!("[{}] generating new I2P destination keypair", name);
166        let keypair = sam::dest_generate(sam_addr)?;
167
168        // Save private key to disk
169        if let Some(parent) = key_path.parent() {
170            std::fs::create_dir_all(parent).map_err(SamError::Io)?;
171        }
172        std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
173        log::info!("[{}] saved I2P key to {:?}", name, key_path);
174
175        Ok(keypair)
176    }
177}
178
179/// Start the I2P interface coordinator. All peer connections are registered
180/// as dynamic interfaces via InterfaceUp events.
181pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
182    let name = config.name.clone();
183
184    thread::Builder::new()
185        .name(format!("i2p-coord-{}", config.interface_id.0))
186        .spawn(move || {
187            if let Err(e) = coordinator(config, tx, next_id) {
188                log::error!("[{}] I2P coordinator failed: {}", name, e);
189            }
190        })?;
191
192    Ok(())
193}
194
195/// Coordinator thread: sets up SAM sessions and spawns peer threads.
196fn coordinator(
197    config: I2pConfig,
198    tx: EventSender,
199    next_id: Arc<AtomicU64>,
200) -> Result<(), SamError> {
201    let ingress_control = config.ingress_control;
202    let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
203        .parse()
204        .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
205
206    // Load or generate keypair
207    let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
208    let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
209
210    // We use a single session for all streams (outbound + inbound).
211    // Session ID must be unique; use the interface name sanitized.
212    let session_id = sanitize_name(&config.name);
213
214    log::info!("[{}] creating SAM session (id={})", config.name, session_id);
215    let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
216
217    // Look up our own destination via NAMING LOOKUP "ME" on the session control socket.
218    // "ME" requires a session context on the same connection.
219    match sam::naming_lookup_on(&mut control_socket, "ME") {
220        Ok(our_dest) => {
221            let b32 = our_dest.base32_address();
222            log::info!("[{}] I2P address: {}", config.name, b32);
223        }
224        Err(e) => {
225            log::warn!("[{}] could not look up own destination: {}", config.name, e);
226        }
227    }
228
229    // Spawn outbound peer threads
230    for peer_addr in &config.peers {
231        let peer_addr = peer_addr.trim().to_string();
232        if peer_addr.is_empty() {
233            continue;
234        }
235
236        let tx2 = tx.clone();
237        let next_id2 = next_id.clone();
238        let sam_addr2 = sam_addr;
239        let session_id2 = session_id.clone();
240        let iface_name = config.name.clone();
241        let runtime = Arc::clone(&config.runtime);
242
243        thread::Builder::new()
244            .name(format!("i2p-out-{}", peer_addr))
245            .spawn(move || {
246                outbound_peer_loop(
247                    sam_addr2,
248                    &session_id2,
249                    &peer_addr,
250                    &iface_name,
251                    tx2,
252                    next_id2,
253                    runtime,
254                    ingress_control,
255                );
256            })
257            .ok();
258    }
259
260    // Spawn acceptor thread if connectable
261    if config.connectable {
262        let tx2 = tx.clone();
263        let next_id2 = next_id.clone();
264        let sam_addr2 = sam_addr;
265        let session_id2 = session_id.clone();
266        let iface_name = config.name.clone();
267
268        thread::Builder::new()
269            .name("i2p-acceptor".into())
270            .spawn(move || {
271                acceptor_loop(
272                    sam_addr2,
273                    &session_id2,
274                    &iface_name,
275                    tx2,
276                    next_id2,
277                    ingress_control,
278                );
279            })
280            .ok();
281    }
282
283    // The control socket (and this thread) must remain alive for the session's lifetime.
284    // We park the coordinator thread here. If control_socket drops, the session ends.
285    let _keep_alive = control_socket;
286    loop {
287        thread::sleep(Duration::from_secs(3600));
288    }
289}
290
291/// Outbound peer thread: connects to a remote I2P destination, runs HDLC reader loop.
292/// Reconnects on failure.
293fn outbound_peer_loop(
294    sam_addr: SocketAddr,
295    session_id: &str,
296    peer_addr: &str,
297    iface_name: &str,
298    tx: EventSender,
299    next_id: Arc<AtomicU64>,
300    runtime: Arc<Mutex<I2pRuntime>>,
301    ingress_control: rns_core::transport::types::IngressControlConfig,
302) {
303    loop {
304        log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
305
306        // Resolve .b32.i2p address if needed
307        let destination = if peer_addr.ends_with(".i2p") {
308            match sam::naming_lookup(&sam_addr, peer_addr) {
309                Ok(dest) => dest.to_i2p_base64(),
310                Err(e) => {
311                    log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
312                    thread::sleep(runtime.lock().unwrap().reconnect_wait);
313                    continue;
314                }
315            }
316        } else {
317            // Assume it's already a full base64 destination
318            peer_addr.to_string()
319        };
320
321        // Connect via SAM
322        match sam::stream_connect(&sam_addr, session_id, &destination) {
323            Ok(stream) => {
324                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
325
326                log::info!(
327                    "[{}] connected to I2P peer {} → id {}",
328                    iface_name,
329                    peer_addr,
330                    client_id.0
331                );
332
333                // Clone stream for writer/reader split
334                let writer_stream = match stream.try_clone() {
335                    Ok(s) => s,
336                    Err(e) => {
337                        log::warn!("[{}] failed to clone stream: {}", iface_name, e);
338                        thread::sleep(runtime.lock().unwrap().reconnect_wait);
339                        continue;
340                    }
341                };
342
343                let writer: Box<dyn Writer> = Box::new(I2pWriter {
344                    stream: writer_stream,
345                });
346
347                let info = InterfaceInfo {
348                    id: client_id,
349                    name: format!("I2PInterface/{}", peer_addr),
350                    mode: constants::MODE_FULL,
351                    out_capable: true,
352                    in_capable: true,
353                    bitrate: Some(BITRATE_GUESS),
354                    announce_rate_target: None,
355                    announce_rate_grace: 0,
356                    announce_rate_penalty: 0.0,
357                    announce_cap: constants::ANNOUNCE_CAP,
358                    is_local_client: false,
359                    wants_tunnel: false,
360                    tunnel_id: None,
361                    mtu: 65535,
362                    ia_freq: 0.0,
363                    started: 0.0,
364                    ingress_control,
365                };
366
367                // Register dynamic interface
368                if tx
369                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
370                    .is_err()
371                {
372                    return; // Driver shut down
373                }
374
375                // Run HDLC reader loop (blocks until disconnect)
376                peer_reader_loop(stream, client_id, iface_name, &tx);
377
378                // Disconnected
379                let _ = tx.send(Event::InterfaceDown(client_id));
380                log::warn!(
381                    "[{}] I2P peer {} disconnected, reconnecting in {}s",
382                    iface_name,
383                    peer_addr,
384                    runtime.lock().unwrap().reconnect_wait.as_secs()
385                );
386            }
387            Err(e) => {
388                log::warn!(
389                    "[{}] failed to connect to I2P peer {}: {}",
390                    iface_name,
391                    peer_addr,
392                    e
393                );
394            }
395        }
396
397        thread::sleep(runtime.lock().unwrap().reconnect_wait);
398    }
399}
400
401/// Acceptor thread: loops accepting inbound connections on the session.
402fn acceptor_loop(
403    sam_addr: SocketAddr,
404    session_id: &str,
405    iface_name: &str,
406    tx: EventSender,
407    next_id: Arc<AtomicU64>,
408    ingress_control: rns_core::transport::types::IngressControlConfig,
409) {
410    loop {
411        match sam::stream_accept(&sam_addr, session_id) {
412            Ok((stream, remote_dest)) => {
413                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
414                let remote_b32 = remote_dest.base32_address();
415
416                log::info!(
417                    "[{}] accepted I2P connection from {} → id {}",
418                    iface_name,
419                    remote_b32,
420                    client_id.0
421                );
422
423                // Clone stream for writer/reader split
424                let writer_stream = match stream.try_clone() {
425                    Ok(s) => s,
426                    Err(e) => {
427                        log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
428                        continue;
429                    }
430                };
431
432                let writer: Box<dyn Writer> = Box::new(I2pWriter {
433                    stream: writer_stream,
434                });
435
436                let info = InterfaceInfo {
437                    id: client_id,
438                    name: format!("I2PInterface/{}", remote_b32),
439                    mode: constants::MODE_FULL,
440                    out_capable: true,
441                    in_capable: true,
442                    bitrate: Some(BITRATE_GUESS),
443                    announce_rate_target: None,
444                    announce_rate_grace: 0,
445                    announce_rate_penalty: 0.0,
446                    announce_cap: constants::ANNOUNCE_CAP,
447                    is_local_client: false,
448                    wants_tunnel: false,
449                    tunnel_id: None,
450                    mtu: 65535,
451                    ia_freq: 0.0,
452                    started: 0.0,
453                    ingress_control,
454                };
455
456                // Register dynamic interface
457                if tx
458                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
459                    .is_err()
460                {
461                    return; // Driver shut down
462                }
463
464                // Spawn per-client reader thread (accepted peers don't reconnect)
465                let client_tx = tx.clone();
466                let client_name = iface_name.to_string();
467                thread::Builder::new()
468                    .name(format!("i2p-client-{}", client_id.0))
469                    .spawn(move || {
470                        peer_reader_loop(stream, client_id, &client_name, &client_tx);
471                        let _ = client_tx.send(Event::InterfaceDown(client_id));
472                    })
473                    .ok();
474            }
475            Err(e) => {
476                log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
477                thread::sleep(Duration::from_secs(1));
478            }
479        }
480    }
481}
482
483/// Per-peer HDLC reader loop. Reads from the SAM data stream, decodes HDLC
484/// frames, and sends them to the driver. Returns when the stream is closed.
485fn peer_reader_loop(
486    mut stream: std::net::TcpStream,
487    id: InterfaceId,
488    name: &str,
489    tx: &EventSender,
490) {
491    let mut decoder = hdlc::Decoder::new();
492    let mut buf = [0u8; 4096];
493
494    loop {
495        match stream.read(&mut buf) {
496            Ok(0) => {
497                log::info!("[{}] I2P peer {} disconnected", name, id.0);
498                return;
499            }
500            Ok(n) => {
501                for frame in decoder.feed(&buf[..n]) {
502                    if tx
503                        .send(Event::Frame {
504                            interface_id: id,
505                            data: frame,
506                        })
507                        .is_err()
508                    {
509                        return; // Driver shut down
510                    }
511                }
512            }
513            Err(e) => {
514                log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
515                return;
516            }
517        }
518    }
519}
520
521// --- Factory implementation ---
522
523use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
524use std::collections::HashMap;
525
526/// Factory for `I2PInterface`.
527pub struct I2pFactory;
528
529impl InterfaceFactory for I2pFactory {
530    fn type_name(&self) -> &str {
531        "I2PInterface"
532    }
533
534    fn parse_config(
535        &self,
536        name: &str,
537        id: InterfaceId,
538        params: &HashMap<String, String>,
539    ) -> Result<Box<dyn InterfaceConfigData>, String> {
540        let sam_host = params
541            .get("sam_host")
542            .cloned()
543            .unwrap_or_else(|| "127.0.0.1".into());
544
545        let sam_port = params
546            .get("sam_port")
547            .and_then(|v| v.parse::<u16>().ok())
548            .unwrap_or(7656);
549
550        let connectable = params
551            .get("connectable")
552            .and_then(|v| crate::config::parse_bool_pub(v))
553            .unwrap_or(false);
554
555        let peers = params
556            .get("peers")
557            .map(|v| {
558                v.split(',')
559                    .map(|s| s.trim().to_string())
560                    .filter(|s| !s.is_empty())
561                    .collect::<Vec<String>>()
562            })
563            .unwrap_or_default();
564
565        let storage_dir = params
566            .get("storage_dir")
567            .map(PathBuf::from)
568            .unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
569
570        Ok(Box::new(I2pConfig {
571            name: name.to_string(),
572            interface_id: id,
573            sam_host,
574            sam_port,
575            connectable,
576            peers,
577            storage_dir,
578            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
579            runtime: Arc::new(Mutex::new(I2pRuntime {
580                reconnect_wait: RECONNECT_WAIT,
581            })),
582        }))
583    }
584
585    fn start(
586        &self,
587        config: Box<dyn InterfaceConfigData>,
588        ctx: StartContext,
589    ) -> io::Result<StartResult> {
590        let mut cfg = *config
591            .into_any()
592            .downcast::<I2pConfig>()
593            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
594        cfg.ingress_control = ctx.ingress_control;
595        start(cfg, ctx.tx, ctx.next_dynamic_id)?;
596        Ok(StartResult::Listener { control: None })
597    }
598}
599
600pub(crate) fn i2p_runtime_handle_from_config(config: &I2pConfig) -> I2pRuntimeConfigHandle {
601    I2pRuntimeConfigHandle {
602        interface_name: config.name.clone(),
603        runtime: Arc::clone(&config.runtime),
604        startup: I2pRuntime::from_config(config),
605    }
606}