Skip to main content

rns_net/interface/i2p/
mod.rs

1//! I2P interface using the SAM v3.1 protocol.
2//!
3//! Provides anonymous transport over the I2P network. The interface manages
4//! both outbound peer connections and inbound connection acceptance.
5//! Each peer connection becomes a dynamic interface with HDLC framing,
6//! similar to TCP server client connections.
7//!
8//! Matches Python `I2PInterface` from `I2PInterface.py`.
9
10pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::{lock_or_recover, Writer};
26
27use self::sam::{Destination, SamError};
28
29/// Hardware MTU for I2P streams (matches Python I2PInterface).
30#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33/// Estimated bitrate for I2P tunnels (256 kbps, matches Python).
34const BITRATE_GUESS: u64 = 256_000;
35
36/// Wait time before reconnecting to an outbound peer.
37const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39/// Configuration for the I2P interface.
40#[derive(Debug, Clone)]
41pub struct I2pConfig {
42    pub name: String,
43    pub interface_id: InterfaceId,
44    /// SAM bridge host (default "127.0.0.1").
45    pub sam_host: String,
46    /// SAM bridge port (default 7656).
47    pub sam_port: u16,
48    /// List of .b32.i2p peer addresses (or full base64 destinations) to connect to.
49    pub peers: Vec<String>,
50    /// Whether to accept inbound connections.
51    pub connectable: bool,
52    /// Directory for key persistence (typically `{config_dir}/storage`).
53    pub storage_dir: PathBuf,
54    pub ingress_control: rns_core::transport::types::IngressControlConfig,
55    pub runtime: Arc<Mutex<I2pRuntime>>,
56}
57
58#[derive(Debug, Clone)]
59pub struct I2pRuntime {
60    pub reconnect_wait: Duration,
61}
62
63impl I2pRuntime {
64    pub fn from_config(_config: &I2pConfig) -> Self {
65        Self {
66            reconnect_wait: RECONNECT_WAIT,
67        }
68    }
69}
70
71#[derive(Debug, Clone)]
72pub struct I2pRuntimeConfigHandle {
73    pub interface_name: String,
74    pub runtime: Arc<Mutex<I2pRuntime>>,
75    pub startup: I2pRuntime,
76}
77
78impl Default for I2pConfig {
79    fn default() -> Self {
80        let mut config = I2pConfig {
81            name: String::new(),
82            interface_id: InterfaceId(0),
83            sam_host: "127.0.0.1".into(),
84            sam_port: 7656,
85            peers: Vec::new(),
86            connectable: false,
87            storage_dir: PathBuf::from("."),
88            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
89            runtime: Arc::new(Mutex::new(I2pRuntime {
90                reconnect_wait: RECONNECT_WAIT,
91            })),
92        };
93        let startup = I2pRuntime::from_config(&config);
94        config.runtime = Arc::new(Mutex::new(startup));
95        config
96    }
97}
98
99/// Writer that sends HDLC-framed data over an I2P SAM stream.
100struct I2pWriter {
101    stream: std::net::TcpStream,
102}
103
104impl Writer for I2pWriter {
105    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
106        self.stream.write_all(&hdlc::frame(data))
107    }
108}
109
110/// Sanitize a name for use in filenames and SAM session IDs.
111fn sanitize_name(name: &str) -> String {
112    name.chars()
113        .map(|c| {
114            if c.is_alphanumeric() || c == '-' || c == '_' {
115                c
116            } else {
117                '_'
118            }
119        })
120        .collect()
121}
122
123/// Key file path for the given interface name.
124fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
125    storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
126}
127
128/// Load or generate an I2P keypair. Persists the private key to disk
129/// so the node keeps a stable I2P address across restarts.
130fn load_or_generate_keypair(
131    sam_addr: &SocketAddr,
132    storage_dir: &PathBuf,
133    name: &str,
134) -> Result<sam::KeyPair, SamError> {
135    let key_path = key_file_path(storage_dir, name);
136
137    if key_path.exists() {
138        // Load existing private key
139        let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
140
141        // We need the public destination too. Create a temporary session to extract it,
142        // or we can derive it by creating a session and reading back the destination.
143        // Simpler: store the full keypair. Let's re-generate from the private key
144        // by creating a session (the destination is embedded in the private key).
145        // Actually, the I2P private key blob contains the destination as a prefix.
146        // We can extract the destination from the private key.
147        // The destination is the first portion of the private key blob.
148        // For simplicity, store the full PRIV blob (which includes the destination).
149
150        // The PRIV blob from DEST GENERATE contains both the destination and private keys.
151        // The destination is extractable as the first ~387 bytes, but the exact size
152        // depends on the key type. For Ed25519 (sig type 7), the destination is:
153        //   256 bytes (encryption public key) + 128 bytes (signing public key) + 3 bytes (certificate)
154        // = 387 bytes. But this can vary with certificate content.
155
156        // Rather than parsing, we pass the full PRIV to SESSION CREATE (which accepts it)
157        // and then do a NAMING LOOKUP on "ME" to get our destination.
158
159        Ok(sam::KeyPair {
160            destination: Destination { data: Vec::new() }, // filled in later
161            private_key: priv_data,
162        })
163    } else {
164        // Generate new keypair
165        log::info!("[{}] generating new I2P destination keypair", name);
166        let keypair = sam::dest_generate(sam_addr)?;
167
168        // Save private key to disk
169        if let Some(parent) = key_path.parent() {
170            std::fs::create_dir_all(parent).map_err(SamError::Io)?;
171        }
172        std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
173        log::info!("[{}] saved I2P key to {:?}", name, key_path);
174
175        Ok(keypair)
176    }
177}
178
179/// Start the I2P interface coordinator. All peer connections are registered
180/// as dynamic interfaces via InterfaceUp events.
181pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
182    let name = config.name.clone();
183
184    thread::Builder::new()
185        .name(format!("i2p-coord-{}", config.interface_id.0))
186        .spawn(move || {
187            if let Err(e) = coordinator(config, tx, next_id) {
188                log::error!("[{}] I2P coordinator failed: {}", name, e);
189            }
190        })?;
191
192    Ok(())
193}
194
195/// Coordinator thread: sets up SAM sessions and spawns peer threads.
196fn coordinator(
197    config: I2pConfig,
198    tx: EventSender,
199    next_id: Arc<AtomicU64>,
200) -> Result<(), SamError> {
201    let ingress_control = config.ingress_control;
202    let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
203        .parse()
204        .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
205
206    // Load or generate keypair
207    let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
208    let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
209
210    // We use a single session for all streams (outbound + inbound).
211    // Session ID must be unique; use the interface name sanitized.
212    let session_id = sanitize_name(&config.name);
213
214    log::info!("[{}] creating SAM session (id={})", config.name, session_id);
215    let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
216
217    // Look up our own destination via NAMING LOOKUP "ME" on the session control socket.
218    // "ME" requires a session context on the same connection.
219    match sam::naming_lookup_on(&mut control_socket, "ME") {
220        Ok(our_dest) => {
221            let b32 = our_dest.base32_address();
222            log::info!("[{}] I2P address: {}", config.name, b32);
223        }
224        Err(e) => {
225            log::warn!("[{}] could not look up own destination: {}", config.name, e);
226        }
227    }
228
229    // Spawn outbound peer threads
230    for peer_addr in &config.peers {
231        let peer_addr = peer_addr.trim().to_string();
232        if peer_addr.is_empty() {
233            continue;
234        }
235
236        let tx2 = tx.clone();
237        let next_id2 = next_id.clone();
238        let sam_addr2 = sam_addr;
239        let session_id2 = session_id.clone();
240        let iface_name = config.name.clone();
241        let runtime = Arc::clone(&config.runtime);
242
243        thread::Builder::new()
244            .name(format!("i2p-out-{}", peer_addr))
245            .spawn(move || {
246                outbound_peer_loop(
247                    sam_addr2,
248                    &session_id2,
249                    &peer_addr,
250                    &iface_name,
251                    tx2,
252                    next_id2,
253                    runtime,
254                    ingress_control,
255                );
256            })
257            .ok();
258    }
259
260    // Spawn acceptor thread if connectable
261    if config.connectable {
262        let tx2 = tx.clone();
263        let next_id2 = next_id.clone();
264        let sam_addr2 = sam_addr;
265        let session_id2 = session_id.clone();
266        let iface_name = config.name.clone();
267
268        thread::Builder::new()
269            .name("i2p-acceptor".into())
270            .spawn(move || {
271                acceptor_loop(
272                    sam_addr2,
273                    &session_id2,
274                    &iface_name,
275                    tx2,
276                    next_id2,
277                    ingress_control,
278                );
279            })
280            .ok();
281    }
282
283    // The control socket (and this thread) must remain alive for the session's lifetime.
284    // We park the coordinator thread here. If control_socket drops, the session ends.
285    let _keep_alive = control_socket;
286    loop {
287        thread::sleep(Duration::from_secs(3600));
288    }
289}
290
291/// Outbound peer thread: connects to a remote I2P destination, runs HDLC reader loop.
292/// Reconnects on failure.
293fn outbound_peer_loop(
294    sam_addr: SocketAddr,
295    session_id: &str,
296    peer_addr: &str,
297    iface_name: &str,
298    tx: EventSender,
299    next_id: Arc<AtomicU64>,
300    runtime: Arc<Mutex<I2pRuntime>>,
301    ingress_control: rns_core::transport::types::IngressControlConfig,
302) {
303    loop {
304        log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
305
306        // Resolve .b32.i2p address if needed
307        let destination = if peer_addr.ends_with(".i2p") {
308            match sam::naming_lookup(&sam_addr, peer_addr) {
309                Ok(dest) => dest.to_i2p_base64(),
310                Err(e) => {
311                    log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
312                    thread::sleep(lock_or_recover(&runtime, "i2p runtime").reconnect_wait);
313                    continue;
314                }
315            }
316        } else {
317            // Assume it's already a full base64 destination
318            peer_addr.to_string()
319        };
320
321        // Connect via SAM
322        match sam::stream_connect(&sam_addr, session_id, &destination) {
323            Ok(stream) => {
324                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
325
326                log::info!(
327                    "[{}] connected to I2P peer {} → id {}",
328                    iface_name,
329                    peer_addr,
330                    client_id.0
331                );
332
333                // Clone stream for writer/reader split
334                let writer_stream = match stream.try_clone() {
335                    Ok(s) => s,
336                    Err(e) => {
337                        log::warn!("[{}] failed to clone stream: {}", iface_name, e);
338                        thread::sleep(lock_or_recover(&runtime, "i2p runtime").reconnect_wait);
339                        continue;
340                    }
341                };
342
343                let writer: Box<dyn Writer> = Box::new(I2pWriter {
344                    stream: writer_stream,
345                });
346
347                let info = InterfaceInfo {
348                    id: client_id,
349                    name: format!("I2PInterface/{}", peer_addr),
350                    mode: constants::MODE_FULL,
351                    out_capable: true,
352                    in_capable: true,
353                    bitrate: Some(BITRATE_GUESS),
354                    airtime_profile: None,
355                    announce_rate_target: None,
356                    announce_rate_grace: 0,
357                    announce_rate_penalty: 0.0,
358                    announce_cap: constants::ANNOUNCE_CAP,
359                    is_local_client: false,
360                    wants_tunnel: false,
361                    tunnel_id: None,
362                    mtu: 65535,
363                    ia_freq: 0.0,
364                    started: 0.0,
365                    ingress_control,
366                };
367
368                // Register dynamic interface
369                if tx
370                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
371                    .is_err()
372                {
373                    return; // Driver shut down
374                }
375
376                // Run HDLC reader loop (blocks until disconnect)
377                peer_reader_loop(stream, client_id, iface_name, &tx);
378
379                // Disconnected
380                let _ = tx.send(Event::InterfaceDown(client_id));
381                log::warn!(
382                    "[{}] I2P peer {} disconnected, reconnecting in {}s",
383                    iface_name,
384                    peer_addr,
385                    lock_or_recover(&runtime, "i2p runtime")
386                        .reconnect_wait
387                        .as_secs()
388                );
389            }
390            Err(e) => {
391                log::warn!(
392                    "[{}] failed to connect to I2P peer {}: {}",
393                    iface_name,
394                    peer_addr,
395                    e
396                );
397            }
398        }
399
400        thread::sleep(lock_or_recover(&runtime, "i2p runtime").reconnect_wait);
401    }
402}
403
404/// Acceptor thread: loops accepting inbound connections on the session.
405fn acceptor_loop(
406    sam_addr: SocketAddr,
407    session_id: &str,
408    iface_name: &str,
409    tx: EventSender,
410    next_id: Arc<AtomicU64>,
411    ingress_control: rns_core::transport::types::IngressControlConfig,
412) {
413    loop {
414        match sam::stream_accept(&sam_addr, session_id) {
415            Ok((stream, remote_dest)) => {
416                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
417                let remote_b32 = remote_dest.base32_address();
418
419                log::info!(
420                    "[{}] accepted I2P connection from {} → id {}",
421                    iface_name,
422                    remote_b32,
423                    client_id.0
424                );
425
426                // Clone stream for writer/reader split
427                let writer_stream = match stream.try_clone() {
428                    Ok(s) => s,
429                    Err(e) => {
430                        log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
431                        continue;
432                    }
433                };
434
435                let writer: Box<dyn Writer> = Box::new(I2pWriter {
436                    stream: writer_stream,
437                });
438
439                let info = InterfaceInfo {
440                    id: client_id,
441                    name: format!("I2PInterface/{}", remote_b32),
442                    mode: constants::MODE_FULL,
443                    out_capable: true,
444                    in_capable: true,
445                    bitrate: Some(BITRATE_GUESS),
446                    airtime_profile: None,
447                    announce_rate_target: None,
448                    announce_rate_grace: 0,
449                    announce_rate_penalty: 0.0,
450                    announce_cap: constants::ANNOUNCE_CAP,
451                    is_local_client: false,
452                    wants_tunnel: false,
453                    tunnel_id: None,
454                    mtu: 65535,
455                    ia_freq: 0.0,
456                    started: 0.0,
457                    ingress_control,
458                };
459
460                // Register dynamic interface
461                if tx
462                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
463                    .is_err()
464                {
465                    return; // Driver shut down
466                }
467
468                // Spawn per-client reader thread (accepted peers don't reconnect)
469                let client_tx = tx.clone();
470                let client_name = iface_name.to_string();
471                thread::Builder::new()
472                    .name(format!("i2p-client-{}", client_id.0))
473                    .spawn(move || {
474                        peer_reader_loop(stream, client_id, &client_name, &client_tx);
475                        let _ = client_tx.send(Event::InterfaceDown(client_id));
476                    })
477                    .ok();
478            }
479            Err(e) => {
480                log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
481                thread::sleep(Duration::from_secs(1));
482            }
483        }
484    }
485}
486
487/// Per-peer HDLC reader loop. Reads from the SAM data stream, decodes HDLC
488/// frames, and sends them to the driver. Returns when the stream is closed.
489fn peer_reader_loop(
490    mut stream: std::net::TcpStream,
491    id: InterfaceId,
492    name: &str,
493    tx: &EventSender,
494) {
495    let mut decoder = hdlc::Decoder::new();
496    let mut buf = [0u8; 4096];
497
498    loop {
499        match stream.read(&mut buf) {
500            Ok(0) => {
501                log::info!("[{}] I2P peer {} disconnected", name, id.0);
502                return;
503            }
504            Ok(n) => {
505                for frame in decoder.feed(&buf[..n]) {
506                    if tx
507                        .send(Event::Frame {
508                            interface_id: id,
509                            data: frame,
510                        })
511                        .is_err()
512                    {
513                        return; // Driver shut down
514                    }
515                }
516            }
517            Err(e) => {
518                log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
519                return;
520            }
521        }
522    }
523}
524
525// --- Factory implementation ---
526
527use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
528use std::collections::HashMap;
529
530/// Factory for `I2PInterface`.
531pub struct I2pFactory;
532
533impl InterfaceFactory for I2pFactory {
534    fn type_name(&self) -> &str {
535        "I2PInterface"
536    }
537
538    fn parse_config(
539        &self,
540        name: &str,
541        id: InterfaceId,
542        params: &HashMap<String, String>,
543    ) -> Result<Box<dyn InterfaceConfigData>, String> {
544        let sam_host = params
545            .get("sam_host")
546            .cloned()
547            .unwrap_or_else(|| "127.0.0.1".into());
548
549        let sam_port = params
550            .get("sam_port")
551            .and_then(|v| v.parse::<u16>().ok())
552            .unwrap_or(7656);
553
554        let connectable = params
555            .get("connectable")
556            .and_then(|v| crate::config::parse_bool_pub(v))
557            .unwrap_or(false);
558
559        let peers = params
560            .get("peers")
561            .map(|v| {
562                v.split(',')
563                    .map(|s| s.trim().to_string())
564                    .filter(|s| !s.is_empty())
565                    .collect::<Vec<String>>()
566            })
567            .unwrap_or_default();
568
569        let storage_dir = params
570            .get("storage_dir")
571            .map(PathBuf::from)
572            .unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
573
574        Ok(Box::new(I2pConfig {
575            name: name.to_string(),
576            interface_id: id,
577            sam_host,
578            sam_port,
579            connectable,
580            peers,
581            storage_dir,
582            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
583            runtime: Arc::new(Mutex::new(I2pRuntime {
584                reconnect_wait: RECONNECT_WAIT,
585            })),
586        }))
587    }
588
589    fn start(
590        &self,
591        config: Box<dyn InterfaceConfigData>,
592        ctx: StartContext,
593    ) -> io::Result<StartResult> {
594        let mut cfg = *config
595            .into_any()
596            .downcast::<I2pConfig>()
597            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
598        cfg.ingress_control = ctx.ingress_control;
599        start(cfg, ctx.tx, ctx.next_dynamic_id)?;
600        Ok(StartResult::Listener { control: None })
601    }
602}
603
604pub(crate) fn i2p_runtime_handle_from_config(config: &I2pConfig) -> I2pRuntimeConfigHandle {
605    I2pRuntimeConfigHandle {
606        interface_name: config.name.clone(),
607        runtime: Arc::clone(&config.runtime),
608        startup: I2pRuntime::from_config(config),
609    }
610}