Skip to main content

rns_net/interface/i2p/
mod.rs

1//! I2P interface using the SAM v3.1 protocol.
2//!
3//! Provides anonymous transport over the I2P network. The interface manages
4//! both outbound peer connections and inbound connection acceptance.
5//! Each peer connection becomes a dynamic interface with HDLC framing,
6//! similar to TCP server client connections.
7//!
8//! Matches Python `I2PInterface` from `I2PInterface.py`.
9
10pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29/// Hardware MTU for I2P streams (matches Python I2PInterface).
30#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33/// Estimated bitrate for I2P tunnels (256 kbps, matches Python).
34const BITRATE_GUESS: u64 = 256_000;
35
36/// Wait time before reconnecting to an outbound peer.
37const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39/// Configuration for the I2P interface.
40#[derive(Debug, Clone)]
41pub struct I2pConfig {
42    pub name: String,
43    pub interface_id: InterfaceId,
44    /// SAM bridge host (default "127.0.0.1").
45    pub sam_host: String,
46    /// SAM bridge port (default 7656).
47    pub sam_port: u16,
48    /// List of .b32.i2p peer addresses (or full base64 destinations) to connect to.
49    pub peers: Vec<String>,
50    /// Whether to accept inbound connections.
51    pub connectable: bool,
52    /// Directory for key persistence (typically `{config_dir}/storage`).
53    pub storage_dir: PathBuf,
54}
55
56impl Default for I2pConfig {
57    fn default() -> Self {
58        I2pConfig {
59            name: String::new(),
60            interface_id: InterfaceId(0),
61            sam_host: "127.0.0.1".into(),
62            sam_port: 7656,
63            peers: Vec::new(),
64            connectable: false,
65            storage_dir: PathBuf::from("."),
66        }
67    }
68}
69
70/// Writer that sends HDLC-framed data over an I2P SAM stream.
71struct I2pWriter {
72    stream: std::net::TcpStream,
73}
74
75impl Writer for I2pWriter {
76    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
77        self.stream.write_all(&hdlc::frame(data))
78    }
79}
80
81/// Sanitize a name for use in filenames and SAM session IDs.
82fn sanitize_name(name: &str) -> String {
83    name.chars()
84        .map(|c| {
85            if c.is_alphanumeric() || c == '-' || c == '_' {
86                c
87            } else {
88                '_'
89            }
90        })
91        .collect()
92}
93
94/// Key file path for the given interface name.
95fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
96    storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
97}
98
99/// Load or generate an I2P keypair. Persists the private key to disk
100/// so the node keeps a stable I2P address across restarts.
101fn load_or_generate_keypair(
102    sam_addr: &SocketAddr,
103    storage_dir: &PathBuf,
104    name: &str,
105) -> Result<sam::KeyPair, SamError> {
106    let key_path = key_file_path(storage_dir, name);
107
108    if key_path.exists() {
109        // Load existing private key
110        let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
111
112        // We need the public destination too. Create a temporary session to extract it,
113        // or we can derive it by creating a session and reading back the destination.
114        // Simpler: store the full keypair. Let's re-generate from the private key
115        // by creating a session (the destination is embedded in the private key).
116        // Actually, the I2P private key blob contains the destination as a prefix.
117        // We can extract the destination from the private key.
118        // The destination is the first portion of the private key blob.
119        // For simplicity, store the full PRIV blob (which includes the destination).
120
121        // The PRIV blob from DEST GENERATE contains both the destination and private keys.
122        // The destination is extractable as the first ~387 bytes, but the exact size
123        // depends on the key type. For Ed25519 (sig type 7), the destination is:
124        //   256 bytes (encryption public key) + 128 bytes (signing public key) + 3 bytes (certificate)
125        // = 387 bytes. But this can vary with certificate content.
126
127        // Rather than parsing, we pass the full PRIV to SESSION CREATE (which accepts it)
128        // and then do a NAMING LOOKUP on "ME" to get our destination.
129
130        Ok(sam::KeyPair {
131            destination: Destination { data: Vec::new() }, // filled in later
132            private_key: priv_data,
133        })
134    } else {
135        // Generate new keypair
136        log::info!("[{}] generating new I2P destination keypair", name);
137        let keypair = sam::dest_generate(sam_addr)?;
138
139        // Save private key to disk
140        if let Some(parent) = key_path.parent() {
141            std::fs::create_dir_all(parent).map_err(SamError::Io)?;
142        }
143        std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
144        log::info!("[{}] saved I2P key to {:?}", name, key_path);
145
146        Ok(keypair)
147    }
148}
149
150/// Start the I2P interface coordinator. All peer connections are registered
151/// as dynamic interfaces via InterfaceUp events.
152pub fn start(config: I2pConfig, tx: EventSender, next_id: Arc<AtomicU64>) -> io::Result<()> {
153    let name = config.name.clone();
154
155    thread::Builder::new()
156        .name(format!("i2p-coord-{}", config.interface_id.0))
157        .spawn(move || {
158            if let Err(e) = coordinator(config, tx, next_id) {
159                log::error!("[{}] I2P coordinator failed: {}", name, e);
160            }
161        })?;
162
163    Ok(())
164}
165
166/// Coordinator thread: sets up SAM sessions and spawns peer threads.
167fn coordinator(
168    config: I2pConfig,
169    tx: EventSender,
170    next_id: Arc<AtomicU64>,
171) -> Result<(), SamError> {
172    let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
173        .parse()
174        .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
175
176    // Load or generate keypair
177    let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
178    let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
179
180    // We use a single session for all streams (outbound + inbound).
181    // Session ID must be unique; use the interface name sanitized.
182    let session_id = sanitize_name(&config.name);
183
184    log::info!("[{}] creating SAM session (id={})", config.name, session_id);
185    let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
186
187    // Look up our own destination via NAMING LOOKUP "ME" on the session control socket.
188    // "ME" requires a session context on the same connection.
189    match sam::naming_lookup_on(&mut control_socket, "ME") {
190        Ok(our_dest) => {
191            let b32 = our_dest.base32_address();
192            log::info!("[{}] I2P address: {}", config.name, b32);
193        }
194        Err(e) => {
195            log::warn!("[{}] could not look up own destination: {}", config.name, e);
196        }
197    }
198
199    // Spawn outbound peer threads
200    for peer_addr in &config.peers {
201        let peer_addr = peer_addr.trim().to_string();
202        if peer_addr.is_empty() {
203            continue;
204        }
205
206        let tx2 = tx.clone();
207        let next_id2 = next_id.clone();
208        let sam_addr2 = sam_addr;
209        let session_id2 = session_id.clone();
210        let iface_name = config.name.clone();
211
212        thread::Builder::new()
213            .name(format!("i2p-out-{}", peer_addr))
214            .spawn(move || {
215                outbound_peer_loop(
216                    sam_addr2,
217                    &session_id2,
218                    &peer_addr,
219                    &iface_name,
220                    tx2,
221                    next_id2,
222                );
223            })
224            .ok();
225    }
226
227    // Spawn acceptor thread if connectable
228    if config.connectable {
229        let tx2 = tx.clone();
230        let next_id2 = next_id.clone();
231        let sam_addr2 = sam_addr;
232        let session_id2 = session_id.clone();
233        let iface_name = config.name.clone();
234
235        thread::Builder::new()
236            .name("i2p-acceptor".into())
237            .spawn(move || {
238                acceptor_loop(sam_addr2, &session_id2, &iface_name, tx2, next_id2);
239            })
240            .ok();
241    }
242
243    // The control socket (and this thread) must remain alive for the session's lifetime.
244    // We park the coordinator thread here. If control_socket drops, the session ends.
245    let _keep_alive = control_socket;
246    loop {
247        thread::sleep(Duration::from_secs(3600));
248    }
249}
250
251/// Outbound peer thread: connects to a remote I2P destination, runs HDLC reader loop.
252/// Reconnects on failure.
253fn outbound_peer_loop(
254    sam_addr: SocketAddr,
255    session_id: &str,
256    peer_addr: &str,
257    iface_name: &str,
258    tx: EventSender,
259    next_id: Arc<AtomicU64>,
260) {
261    loop {
262        log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
263
264        // Resolve .b32.i2p address if needed
265        let destination = if peer_addr.ends_with(".i2p") {
266            match sam::naming_lookup(&sam_addr, peer_addr) {
267                Ok(dest) => dest.to_i2p_base64(),
268                Err(e) => {
269                    log::warn!("[{}] failed to resolve {}: {}", iface_name, peer_addr, e);
270                    thread::sleep(RECONNECT_WAIT);
271                    continue;
272                }
273            }
274        } else {
275            // Assume it's already a full base64 destination
276            peer_addr.to_string()
277        };
278
279        // Connect via SAM
280        match sam::stream_connect(&sam_addr, session_id, &destination) {
281            Ok(stream) => {
282                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
283
284                log::info!(
285                    "[{}] connected to I2P peer {} → id {}",
286                    iface_name,
287                    peer_addr,
288                    client_id.0
289                );
290
291                // Clone stream for writer/reader split
292                let writer_stream = match stream.try_clone() {
293                    Ok(s) => s,
294                    Err(e) => {
295                        log::warn!("[{}] failed to clone stream: {}", iface_name, e);
296                        thread::sleep(RECONNECT_WAIT);
297                        continue;
298                    }
299                };
300
301                let writer: Box<dyn Writer> = Box::new(I2pWriter {
302                    stream: writer_stream,
303                });
304
305                let info = InterfaceInfo {
306                    id: client_id,
307                    name: format!("I2PInterface/{}", peer_addr),
308                    mode: constants::MODE_FULL,
309                    out_capable: true,
310                    in_capable: true,
311                    bitrate: Some(BITRATE_GUESS),
312                    announce_rate_target: None,
313                    announce_rate_grace: 0,
314                    announce_rate_penalty: 0.0,
315                    announce_cap: constants::ANNOUNCE_CAP,
316                    is_local_client: false,
317                    wants_tunnel: false,
318                    tunnel_id: None,
319                    mtu: 65535,
320                    ia_freq: 0.0,
321                    started: 0.0,
322                    ingress_control: true,
323                };
324
325                // Register dynamic interface
326                if tx
327                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
328                    .is_err()
329                {
330                    return; // Driver shut down
331                }
332
333                // Run HDLC reader loop (blocks until disconnect)
334                peer_reader_loop(stream, client_id, iface_name, &tx);
335
336                // Disconnected
337                let _ = tx.send(Event::InterfaceDown(client_id));
338                log::warn!(
339                    "[{}] I2P peer {} disconnected, reconnecting in {}s",
340                    iface_name,
341                    peer_addr,
342                    RECONNECT_WAIT.as_secs()
343                );
344            }
345            Err(e) => {
346                log::warn!(
347                    "[{}] failed to connect to I2P peer {}: {}",
348                    iface_name,
349                    peer_addr,
350                    e
351                );
352            }
353        }
354
355        thread::sleep(RECONNECT_WAIT);
356    }
357}
358
359/// Acceptor thread: loops accepting inbound connections on the session.
360fn acceptor_loop(
361    sam_addr: SocketAddr,
362    session_id: &str,
363    iface_name: &str,
364    tx: EventSender,
365    next_id: Arc<AtomicU64>,
366) {
367    loop {
368        match sam::stream_accept(&sam_addr, session_id) {
369            Ok((stream, remote_dest)) => {
370                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
371                let remote_b32 = remote_dest.base32_address();
372
373                log::info!(
374                    "[{}] accepted I2P connection from {} → id {}",
375                    iface_name,
376                    remote_b32,
377                    client_id.0
378                );
379
380                // Clone stream for writer/reader split
381                let writer_stream = match stream.try_clone() {
382                    Ok(s) => s,
383                    Err(e) => {
384                        log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
385                        continue;
386                    }
387                };
388
389                let writer: Box<dyn Writer> = Box::new(I2pWriter {
390                    stream: writer_stream,
391                });
392
393                let info = InterfaceInfo {
394                    id: client_id,
395                    name: format!("I2PInterface/{}", remote_b32),
396                    mode: constants::MODE_FULL,
397                    out_capable: true,
398                    in_capable: true,
399                    bitrate: Some(BITRATE_GUESS),
400                    announce_rate_target: None,
401                    announce_rate_grace: 0,
402                    announce_rate_penalty: 0.0,
403                    announce_cap: constants::ANNOUNCE_CAP,
404                    is_local_client: false,
405                    wants_tunnel: false,
406                    tunnel_id: None,
407                    mtu: 65535,
408                    ia_freq: 0.0,
409                    started: 0.0,
410                    ingress_control: true,
411                };
412
413                // Register dynamic interface
414                if tx
415                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
416                    .is_err()
417                {
418                    return; // Driver shut down
419                }
420
421                // Spawn per-client reader thread (accepted peers don't reconnect)
422                let client_tx = tx.clone();
423                let client_name = iface_name.to_string();
424                thread::Builder::new()
425                    .name(format!("i2p-client-{}", client_id.0))
426                    .spawn(move || {
427                        peer_reader_loop(stream, client_id, &client_name, &client_tx);
428                        let _ = client_tx.send(Event::InterfaceDown(client_id));
429                    })
430                    .ok();
431            }
432            Err(e) => {
433                log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
434                thread::sleep(Duration::from_secs(1));
435            }
436        }
437    }
438}
439
440/// Per-peer HDLC reader loop. Reads from the SAM data stream, decodes HDLC
441/// frames, and sends them to the driver. Returns when the stream is closed.
442fn peer_reader_loop(
443    mut stream: std::net::TcpStream,
444    id: InterfaceId,
445    name: &str,
446    tx: &EventSender,
447) {
448    let mut decoder = hdlc::Decoder::new();
449    let mut buf = [0u8; 4096];
450
451    loop {
452        match stream.read(&mut buf) {
453            Ok(0) => {
454                log::info!("[{}] I2P peer {} disconnected", name, id.0);
455                return;
456            }
457            Ok(n) => {
458                for frame in decoder.feed(&buf[..n]) {
459                    if tx
460                        .send(Event::Frame {
461                            interface_id: id,
462                            data: frame,
463                        })
464                        .is_err()
465                    {
466                        return; // Driver shut down
467                    }
468                }
469            }
470            Err(e) => {
471                log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
472                return;
473            }
474        }
475    }
476}
477
478// --- Factory implementation ---
479
480use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
481use std::collections::HashMap;
482
483/// Factory for `I2PInterface`.
484pub struct I2pFactory;
485
486impl InterfaceFactory for I2pFactory {
487    fn type_name(&self) -> &str {
488        "I2PInterface"
489    }
490
491    fn parse_config(
492        &self,
493        name: &str,
494        id: InterfaceId,
495        params: &HashMap<String, String>,
496    ) -> Result<Box<dyn InterfaceConfigData>, String> {
497        let sam_host = params
498            .get("sam_host")
499            .cloned()
500            .unwrap_or_else(|| "127.0.0.1".into());
501
502        let sam_port = params
503            .get("sam_port")
504            .and_then(|v| v.parse::<u16>().ok())
505            .unwrap_or(7656);
506
507        let connectable = params
508            .get("connectable")
509            .and_then(|v| crate::config::parse_bool_pub(v))
510            .unwrap_or(false);
511
512        let peers = params
513            .get("peers")
514            .map(|v| {
515                v.split(',')
516                    .map(|s| s.trim().to_string())
517                    .filter(|s| !s.is_empty())
518                    .collect::<Vec<String>>()
519            })
520            .unwrap_or_default();
521
522        let storage_dir = params
523            .get("storage_dir")
524            .map(PathBuf::from)
525            .unwrap_or_else(|| PathBuf::from("/tmp/rns-i2p"));
526
527        Ok(Box::new(I2pConfig {
528            name: name.to_string(),
529            interface_id: id,
530            sam_host,
531            sam_port,
532            connectable,
533            peers,
534            storage_dir,
535        }))
536    }
537
538    fn start(
539        &self,
540        config: Box<dyn InterfaceConfigData>,
541        ctx: StartContext,
542    ) -> io::Result<StartResult> {
543        let cfg = *config
544            .into_any()
545            .downcast::<I2pConfig>()
546            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
547        start(cfg, ctx.tx, ctx.next_dynamic_id)?;
548        Ok(StartResult::Listener)
549    }
550}