Skip to main content

rns_net/interface/i2p/
mod.rs

1//! I2P interface using the SAM v3.1 protocol.
2//!
3//! Provides anonymous transport over the I2P network. The interface manages
4//! both outbound peer connections and inbound connection acceptance.
5//! Each peer connection becomes a dynamic interface with HDLC framing,
6//! similar to TCP server client connections.
7//!
8//! Matches Python `I2PInterface` from `I2PInterface.py`.
9
10pub mod sam;
11
12use std::io::{self, Read, Write};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Duration;
19
20use rns_core::constants;
21use rns_core::transport::types::{InterfaceId, InterfaceInfo};
22
23use crate::event::{Event, EventSender};
24use crate::hdlc;
25use crate::interface::Writer;
26
27use self::sam::{Destination, SamError};
28
29/// Hardware MTU for I2P streams (matches Python I2PInterface).
30#[allow(dead_code)]
31const HW_MTU: usize = 1064;
32
33/// Estimated bitrate for I2P tunnels (256 kbps, matches Python).
34const BITRATE_GUESS: u64 = 256_000;
35
36/// Wait time before reconnecting to an outbound peer.
37const RECONNECT_WAIT: Duration = Duration::from_secs(15);
38
39/// Configuration for the I2P interface.
40#[derive(Debug, Clone)]
41pub struct I2pConfig {
42    pub name: String,
43    pub interface_id: InterfaceId,
44    /// SAM bridge host (default "127.0.0.1").
45    pub sam_host: String,
46    /// SAM bridge port (default 7656).
47    pub sam_port: u16,
48    /// List of .b32.i2p peer addresses (or full base64 destinations) to connect to.
49    pub peers: Vec<String>,
50    /// Whether to accept inbound connections.
51    pub connectable: bool,
52    /// Directory for key persistence (typically `{config_dir}/storage`).
53    pub storage_dir: PathBuf,
54}
55
56impl Default for I2pConfig {
57    fn default() -> Self {
58        I2pConfig {
59            name: String::new(),
60            interface_id: InterfaceId(0),
61            sam_host: "127.0.0.1".into(),
62            sam_port: 7656,
63            peers: Vec::new(),
64            connectable: false,
65            storage_dir: PathBuf::from("."),
66        }
67    }
68}
69
70/// Writer that sends HDLC-framed data over an I2P SAM stream.
71struct I2pWriter {
72    stream: std::net::TcpStream,
73}
74
75impl Writer for I2pWriter {
76    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
77        self.stream.write_all(&hdlc::frame(data))
78    }
79}
80
81/// Sanitize a name for use in filenames and SAM session IDs.
82fn sanitize_name(name: &str) -> String {
83    name.chars()
84        .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '_' })
85        .collect()
86}
87
88/// Key file path for the given interface name.
89fn key_file_path(storage_dir: &PathBuf, name: &str) -> PathBuf {
90    storage_dir.join(format!("i2p_{}.key", sanitize_name(name)))
91}
92
93/// Load or generate an I2P keypair. Persists the private key to disk
94/// so the node keeps a stable I2P address across restarts.
95fn load_or_generate_keypair(
96    sam_addr: &SocketAddr,
97    storage_dir: &PathBuf,
98    name: &str,
99) -> Result<sam::KeyPair, SamError> {
100    let key_path = key_file_path(storage_dir, name);
101
102    if key_path.exists() {
103        // Load existing private key
104        let priv_data = std::fs::read(&key_path).map_err(SamError::Io)?;
105
106        // We need the public destination too. Create a temporary session to extract it,
107        // or we can derive it by creating a session and reading back the destination.
108        // Simpler: store the full keypair. Let's re-generate from the private key
109        // by creating a session (the destination is embedded in the private key).
110        // Actually, the I2P private key blob contains the destination as a prefix.
111        // We can extract the destination from the private key.
112        // The destination is the first portion of the private key blob.
113        // For simplicity, store the full PRIV blob (which includes the destination).
114
115        // The PRIV blob from DEST GENERATE contains both the destination and private keys.
116        // The destination is extractable as the first ~387 bytes, but the exact size
117        // depends on the key type. For Ed25519 (sig type 7), the destination is:
118        //   256 bytes (encryption public key) + 128 bytes (signing public key) + 3 bytes (certificate)
119        // = 387 bytes. But this can vary with certificate content.
120
121        // Rather than parsing, we pass the full PRIV to SESSION CREATE (which accepts it)
122        // and then do a NAMING LOOKUP on "ME" to get our destination.
123
124        Ok(sam::KeyPair {
125            destination: Destination { data: Vec::new() }, // filled in later
126            private_key: priv_data,
127        })
128    } else {
129        // Generate new keypair
130        log::info!("[{}] generating new I2P destination keypair", name);
131        let keypair = sam::dest_generate(sam_addr)?;
132
133        // Save private key to disk
134        if let Some(parent) = key_path.parent() {
135            std::fs::create_dir_all(parent).map_err(SamError::Io)?;
136        }
137        std::fs::write(&key_path, &keypair.private_key).map_err(SamError::Io)?;
138        log::info!("[{}] saved I2P key to {:?}", name, key_path);
139
140        Ok(keypair)
141    }
142}
143
144/// Start the I2P interface coordinator. All peer connections are registered
145/// as dynamic interfaces via InterfaceUp events.
146pub fn start(
147    config: I2pConfig,
148    tx: EventSender,
149    next_id: Arc<AtomicU64>,
150) -> io::Result<()> {
151    let name = config.name.clone();
152
153    thread::Builder::new()
154        .name(format!("i2p-coord-{}", config.interface_id.0))
155        .spawn(move || {
156            if let Err(e) = coordinator(config, tx, next_id) {
157                log::error!("[{}] I2P coordinator failed: {}", name, e);
158            }
159        })?;
160
161    Ok(())
162}
163
164/// Coordinator thread: sets up SAM sessions and spawns peer threads.
165fn coordinator(
166    config: I2pConfig,
167    tx: EventSender,
168    next_id: Arc<AtomicU64>,
169) -> Result<(), SamError> {
170    let sam_addr: SocketAddr = format!("{}:{}", config.sam_host, config.sam_port)
171        .parse()
172        .map_err(|e| SamError::Io(io::Error::new(io::ErrorKind::InvalidInput, e)))?;
173
174    // Load or generate keypair
175    let keypair = load_or_generate_keypair(&sam_addr, &config.storage_dir, &config.name)?;
176    let priv_b64 = sam::i2p_base64_encode(&keypair.private_key);
177
178    // We use a single session for all streams (outbound + inbound).
179    // Session ID must be unique; use the interface name sanitized.
180    let session_id = sanitize_name(&config.name);
181
182    log::info!("[{}] creating SAM session (id={})", config.name, session_id);
183    let mut control_socket = sam::session_create(&sam_addr, &session_id, &priv_b64)?;
184
185    // Look up our own destination via NAMING LOOKUP "ME" on the session control socket.
186    // "ME" requires a session context on the same connection.
187    match sam::naming_lookup_on(&mut control_socket, "ME") {
188        Ok(our_dest) => {
189            let b32 = our_dest.base32_address();
190            log::info!("[{}] I2P address: {}", config.name, b32);
191        }
192        Err(e) => {
193            log::warn!("[{}] could not look up own destination: {}", config.name, e);
194        }
195    }
196
197    // Spawn outbound peer threads
198    for peer_addr in &config.peers {
199        let peer_addr = peer_addr.trim().to_string();
200        if peer_addr.is_empty() {
201            continue;
202        }
203
204        let tx2 = tx.clone();
205        let next_id2 = next_id.clone();
206        let sam_addr2 = sam_addr;
207        let session_id2 = session_id.clone();
208        let iface_name = config.name.clone();
209
210        thread::Builder::new()
211            .name(format!("i2p-out-{}", peer_addr))
212            .spawn(move || {
213                outbound_peer_loop(
214                    sam_addr2,
215                    &session_id2,
216                    &peer_addr,
217                    &iface_name,
218                    tx2,
219                    next_id2,
220                );
221            })
222            .ok();
223    }
224
225    // Spawn acceptor thread if connectable
226    if config.connectable {
227        let tx2 = tx.clone();
228        let next_id2 = next_id.clone();
229        let sam_addr2 = sam_addr;
230        let session_id2 = session_id.clone();
231        let iface_name = config.name.clone();
232
233        thread::Builder::new()
234            .name("i2p-acceptor".into())
235            .spawn(move || {
236                acceptor_loop(sam_addr2, &session_id2, &iface_name, tx2, next_id2);
237            })
238            .ok();
239    }
240
241    // The control socket (and this thread) must remain alive for the session's lifetime.
242    // We park the coordinator thread here. If control_socket drops, the session ends.
243    let _keep_alive = control_socket;
244    loop {
245        thread::sleep(Duration::from_secs(3600));
246    }
247}
248
249/// Outbound peer thread: connects to a remote I2P destination, runs HDLC reader loop.
250/// Reconnects on failure.
251fn outbound_peer_loop(
252    sam_addr: SocketAddr,
253    session_id: &str,
254    peer_addr: &str,
255    iface_name: &str,
256    tx: EventSender,
257    next_id: Arc<AtomicU64>,
258) {
259    loop {
260        log::info!("[{}] connecting to I2P peer {}", iface_name, peer_addr);
261
262        // Resolve .b32.i2p address if needed
263        let destination = if peer_addr.ends_with(".i2p") {
264            match sam::naming_lookup(&sam_addr, peer_addr) {
265                Ok(dest) => dest.to_i2p_base64(),
266                Err(e) => {
267                    log::warn!(
268                        "[{}] failed to resolve {}: {}",
269                        iface_name, peer_addr, e
270                    );
271                    thread::sleep(RECONNECT_WAIT);
272                    continue;
273                }
274            }
275        } else {
276            // Assume it's already a full base64 destination
277            peer_addr.to_string()
278        };
279
280        // Connect via SAM
281        match sam::stream_connect(&sam_addr, session_id, &destination) {
282            Ok(stream) => {
283                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
284
285                log::info!(
286                    "[{}] connected to I2P peer {} → id {}",
287                    iface_name, peer_addr, client_id.0
288                );
289
290                // Clone stream for writer/reader split
291                let writer_stream = match stream.try_clone() {
292                    Ok(s) => s,
293                    Err(e) => {
294                        log::warn!("[{}] failed to clone stream: {}", iface_name, e);
295                        thread::sleep(RECONNECT_WAIT);
296                        continue;
297                    }
298                };
299
300                let writer: Box<dyn Writer> = Box::new(I2pWriter { stream: writer_stream });
301
302                let info = InterfaceInfo {
303                    id: client_id,
304                    name: format!("I2PInterface/{}", peer_addr),
305                    mode: constants::MODE_FULL,
306                    out_capable: true,
307                    in_capable: true,
308                    bitrate: Some(BITRATE_GUESS),
309                    announce_rate_target: None,
310                    announce_rate_grace: 0,
311                    announce_rate_penalty: 0.0,
312                    announce_cap: constants::ANNOUNCE_CAP,
313                    is_local_client: false,
314                    wants_tunnel: false,
315                    tunnel_id: None,
316                    mtu: 65535,
317                    ia_freq: 0.0,
318                    started: 0.0,
319                    ingress_control: true,
320                };
321
322                // Register dynamic interface
323                if tx
324                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
325                    .is_err()
326                {
327                    return; // Driver shut down
328                }
329
330                // Run HDLC reader loop (blocks until disconnect)
331                peer_reader_loop(stream, client_id, iface_name, &tx);
332
333                // Disconnected
334                let _ = tx.send(Event::InterfaceDown(client_id));
335                log::warn!(
336                    "[{}] I2P peer {} disconnected, reconnecting in {}s",
337                    iface_name,
338                    peer_addr,
339                    RECONNECT_WAIT.as_secs()
340                );
341            }
342            Err(e) => {
343                log::warn!(
344                    "[{}] failed to connect to I2P peer {}: {}",
345                    iface_name, peer_addr, e
346                );
347            }
348        }
349
350        thread::sleep(RECONNECT_WAIT);
351    }
352}
353
354/// Acceptor thread: loops accepting inbound connections on the session.
355fn acceptor_loop(
356    sam_addr: SocketAddr,
357    session_id: &str,
358    iface_name: &str,
359    tx: EventSender,
360    next_id: Arc<AtomicU64>,
361) {
362    loop {
363        match sam::stream_accept(&sam_addr, session_id) {
364            Ok((stream, remote_dest)) => {
365                let client_id = InterfaceId(next_id.fetch_add(1, Ordering::Relaxed));
366                let remote_b32 = remote_dest.base32_address();
367
368                log::info!(
369                    "[{}] accepted I2P connection from {} → id {}",
370                    iface_name, remote_b32, client_id.0
371                );
372
373                // Clone stream for writer/reader split
374                let writer_stream = match stream.try_clone() {
375                    Ok(s) => s,
376                    Err(e) => {
377                        log::warn!("[{}] failed to clone accepted stream: {}", iface_name, e);
378                        continue;
379                    }
380                };
381
382                let writer: Box<dyn Writer> = Box::new(I2pWriter { stream: writer_stream });
383
384                let info = InterfaceInfo {
385                    id: client_id,
386                    name: format!("I2PInterface/{}", remote_b32),
387                    mode: constants::MODE_FULL,
388                    out_capable: true,
389                    in_capable: true,
390                    bitrate: Some(BITRATE_GUESS),
391                    announce_rate_target: None,
392                    announce_rate_grace: 0,
393                    announce_rate_penalty: 0.0,
394                    announce_cap: constants::ANNOUNCE_CAP,
395                    is_local_client: false,
396                    wants_tunnel: false,
397                    tunnel_id: None,
398                    mtu: 65535,
399                    ia_freq: 0.0,
400                    started: 0.0,
401                    ingress_control: true,
402                };
403
404                // Register dynamic interface
405                if tx
406                    .send(Event::InterfaceUp(client_id, Some(writer), Some(info)))
407                    .is_err()
408                {
409                    return; // Driver shut down
410                }
411
412                // Spawn per-client reader thread (accepted peers don't reconnect)
413                let client_tx = tx.clone();
414                let client_name = iface_name.to_string();
415                thread::Builder::new()
416                    .name(format!("i2p-client-{}", client_id.0))
417                    .spawn(move || {
418                        peer_reader_loop(stream, client_id, &client_name, &client_tx);
419                        let _ = client_tx.send(Event::InterfaceDown(client_id));
420                    })
421                    .ok();
422            }
423            Err(e) => {
424                log::warn!("[{}] I2P accept failed: {}, retrying", iface_name, e);
425                thread::sleep(Duration::from_secs(1));
426            }
427        }
428    }
429}
430
431/// Per-peer HDLC reader loop. Reads from the SAM data stream, decodes HDLC
432/// frames, and sends them to the driver. Returns when the stream is closed.
433fn peer_reader_loop(
434    mut stream: std::net::TcpStream,
435    id: InterfaceId,
436    name: &str,
437    tx: &EventSender,
438) {
439    let mut decoder = hdlc::Decoder::new();
440    let mut buf = [0u8; 4096];
441
442    loop {
443        match stream.read(&mut buf) {
444            Ok(0) => {
445                log::info!("[{}] I2P peer {} disconnected", name, id.0);
446                return;
447            }
448            Ok(n) => {
449                for frame in decoder.feed(&buf[..n]) {
450                    if tx
451                        .send(Event::Frame {
452                            interface_id: id,
453                            data: frame,
454                        })
455                        .is_err()
456                    {
457                        return; // Driver shut down
458                    }
459                }
460            }
461            Err(e) => {
462                log::warn!("[{}] I2P peer {} read error: {}", name, id.0, e);
463                return;
464            }
465        }
466    }
467}