use std::collections::HashMap;
use std::fmt;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tracing::{debug, info};
use super::destination::I2pDestination;
#[derive(Debug, Clone, thiserror::Error)]
pub enum SamError {
#[error("SAM connection failed: {0}")]
ConnectionFailed(String),
#[error("SAM handshake failed: {0}")]
HandshakeFailed(String),
#[error("SAM session creation failed: {0}")]
SessionCreateFailed(String),
#[error("SAM stream connect failed: {0}")]
StreamConnectFailed(String),
#[error("SAM stream accept failed: {0}")]
StreamAcceptFailed(String),
#[error("SAM naming lookup failed: {0}")]
NamingLookupFailed(String),
#[error("SAM protocol error: {0}")]
ProtocolError(String),
#[error("SAM I/O error: {0}")]
IoError(String),
#[error("SAM invalid destination: {0}")]
InvalidDestination(String),
}
impl From<std::io::Error> for SamError {
fn from(e: std::io::Error) -> Self {
Self::IoError(e.to_string())
}
}
#[derive(Debug, Clone)]
pub(crate) struct SamReply {
pub major: String,
pub minor: String,
pub pairs: HashMap<String, String>,
}
impl SamReply {
pub fn parse(line: &str) -> Result<Self, SamError> {
let line = line.trim();
let mut parts = line.splitn(3, ' ');
let major = parts
.next()
.filter(|s| !s.is_empty())
.ok_or_else(|| SamError::ProtocolError("empty reply".into()))?
.to_string();
let minor = parts
.next()
.ok_or_else(|| SamError::ProtocolError(format!("missing minor token in: {line}")))?
.to_string();
let mut pairs = HashMap::new();
if let Some(rest) = parts.next() {
parse_key_value_pairs(rest, &mut pairs);
}
Ok(Self {
major,
minor,
pairs,
})
}
pub fn is_ok(&self) -> bool {
self.pairs.get("RESULT").is_some_and(|v| v == "OK")
}
pub fn result(&self) -> &str {
self.pairs
.get("RESULT")
.map_or("UNKNOWN", std::string::String::as_str)
}
pub fn message(&self) -> Option<&str> {
self.pairs.get("MESSAGE").map(std::string::String::as_str)
}
}
fn parse_key_value_pairs(s: &str, pairs: &mut HashMap<String, String>) {
let bytes = s.as_bytes();
let mut i = 0;
while i < bytes.len() {
while i < bytes.len() && bytes[i] == b' ' {
i += 1;
}
if i >= bytes.len() {
break;
}
let key_start = i;
while i < bytes.len() && bytes[i] != b'=' && bytes[i] != b' ' {
i += 1;
}
if i >= bytes.len() || bytes[i] != b'=' {
while i < bytes.len() && bytes[i] != b' ' {
i += 1;
}
continue;
}
let key = String::from_utf8_lossy(&bytes[key_start..i]).to_string();
i += 1;
let value = if i < bytes.len() && bytes[i] == b'"' {
i += 1; let val_start = i;
while i < bytes.len() && bytes[i] != b'"' {
i += 1;
}
let val = String::from_utf8_lossy(&bytes[val_start..i]).to_string();
if i < bytes.len() {
i += 1; }
val
} else {
let val_start = i;
while i < bytes.len() && bytes[i] != b' ' {
i += 1;
}
String::from_utf8_lossy(&bytes[val_start..i]).to_string()
};
pairs.insert(key, value);
}
}
#[derive(Debug, Clone)]
pub struct SamTunnelConfig {
pub inbound_quantity: u8,
pub outbound_quantity: u8,
pub inbound_length: u8,
pub outbound_length: u8,
}
impl Default for SamTunnelConfig {
fn default() -> Self {
Self {
inbound_quantity: 3,
outbound_quantity: 3,
inbound_length: 3,
outbound_length: 3,
}
}
}
impl SamTunnelConfig {
fn to_sam_options(&self) -> String {
format!(
"inbound.quantity={} outbound.quantity={} inbound.length={} outbound.length={}",
self.inbound_quantity,
self.outbound_quantity,
self.inbound_length,
self.outbound_length,
)
}
}
pub struct SamSession {
sam_host: String,
sam_port: u16,
destination: I2pDestination,
session_id: String,
_control_stream: TcpStream,
#[allow(dead_code)]
tunnel_config: SamTunnelConfig,
}
#[allow(
clippy::missing_fields_in_debug,
reason = "intentionally omit internal channel fields from Debug output"
)]
impl fmt::Debug for SamSession {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SamSession")
.field("session_id", &self.session_id)
.field("destination", &self.destination)
.finish()
}
}
pub struct SamStream {
stream: TcpStream,
remote_destination: I2pDestination,
}
impl SamStream {
pub fn remote_destination(&self) -> &I2pDestination {
&self.remote_destination
}
pub fn into_inner(self) -> TcpStream {
self.stream
}
pub fn inner(&self) -> &TcpStream {
&self.stream
}
pub fn inner_mut(&mut self) -> &mut TcpStream {
&mut self.stream
}
}
impl SamSession {
pub async fn create(
host: &str,
port: u16,
session_id: &str,
tunnel_config: SamTunnelConfig,
) -> Result<Self, SamError> {
let addr = format!("{host}:{port}");
let stream = TcpStream::connect(&addr)
.await
.map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
let mut reader = BufReader::new(stream);
let hello_cmd = "HELLO VERSION MIN=3.1 MAX=3.1\n";
reader.get_mut().write_all(hello_cmd.as_bytes()).await?;
let mut line = String::new();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if reply.major != "HELLO" || reply.minor != "REPLY" || !reply.is_ok() {
return Err(SamError::HandshakeFailed(format!(
"unexpected reply: {} ({})",
reply.result(),
reply.message().unwrap_or("no message"),
)));
}
let version = reply.pairs.get("VERSION").cloned().unwrap_or_default();
debug!("SAM handshake OK, version {version}");
let tunnel_opts = tunnel_config.to_sam_options();
let session_cmd = format!(
"SESSION CREATE STYLE=STREAM ID={session_id} DESTINATION=TRANSIENT {tunnel_opts}\n"
);
reader.get_mut().write_all(session_cmd.as_bytes()).await?;
line.clear();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if reply.major != "SESSION" || reply.minor != "STATUS" || !reply.is_ok() {
return Err(SamError::SessionCreateFailed(format!(
"{} ({})",
reply.result(),
reply.message().unwrap_or("no message"),
)));
}
let dest_b64 = reply
.pairs
.get("DESTINATION")
.ok_or_else(|| SamError::SessionCreateFailed("missing DESTINATION in reply".into()))?;
let destination = I2pDestination::from_base64(dest_b64).map_err(|e| {
SamError::InvalidDestination(format!("bad destination in SESSION STATUS: {e}"))
})?;
info!(
session_id = session_id,
dest_len = destination.len(),
"SAM session created"
);
let control_stream = reader.into_inner();
Ok(Self {
destination,
session_id: session_id.to_string(),
sam_host: host.to_string(),
sam_port: port,
tunnel_config,
_control_stream: control_stream,
})
}
pub fn destination(&self) -> &I2pDestination {
&self.destination
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub async fn connect(&self, dest: &I2pDestination) -> Result<SamStream, SamError> {
let addr = format!("{}:{}", self.sam_host, self.sam_port);
let stream = TcpStream::connect(&addr)
.await
.map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
let mut reader = BufReader::new(stream);
reader
.get_mut()
.write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
.await?;
let mut line = String::new();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if !reply.is_ok() {
return Err(SamError::HandshakeFailed(format!(
"connect re-handshake: {}",
reply.result()
)));
}
let dest_b64 = dest.to_base64();
let cmd = format!(
"STREAM CONNECT ID={} DESTINATION={} SILENT=false\n",
self.session_id, dest_b64,
);
reader.get_mut().write_all(cmd.as_bytes()).await?;
line.clear();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if reply.major != "STREAM" || reply.minor != "STATUS" || !reply.is_ok() {
return Err(SamError::StreamConnectFailed(format!(
"{} ({})",
reply.result(),
reply.message().unwrap_or("no message"),
)));
}
debug!(dest = %dest, "SAM stream connected");
let stream = reader.into_inner();
Ok(SamStream {
stream,
remote_destination: dest.clone(),
})
}
pub async fn accept(&self) -> Result<SamStream, SamError> {
let addr = format!("{}:{}", self.sam_host, self.sam_port);
let stream = TcpStream::connect(&addr)
.await
.map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
let mut reader = BufReader::new(stream);
reader
.get_mut()
.write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
.await?;
let mut line = String::new();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if !reply.is_ok() {
return Err(SamError::HandshakeFailed(format!(
"accept re-handshake: {}",
reply.result()
)));
}
let cmd = format!("STREAM ACCEPT ID={} SILENT=false\n", self.session_id);
reader.get_mut().write_all(cmd.as_bytes()).await?;
line.clear();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if reply.major != "STREAM" || reply.minor != "STATUS" || !reply.is_ok() {
return Err(SamError::StreamAcceptFailed(format!(
"{} ({})",
reply.result(),
reply.message().unwrap_or("no message"),
)));
}
line.clear();
reader.read_line(&mut line).await?;
let remote_dest_b64 = line.trim();
let remote_destination = I2pDestination::from_base64(remote_dest_b64)
.map_err(|e| SamError::InvalidDestination(format!("incoming destination: {e}")))?;
debug!(remote = %remote_destination, "SAM stream accepted");
let stream = reader.into_inner();
Ok(SamStream {
stream,
remote_destination,
})
}
pub async fn naming_lookup(&self, name: &str) -> Result<I2pDestination, SamError> {
let addr = format!("{}:{}", self.sam_host, self.sam_port);
let stream = TcpStream::connect(&addr)
.await
.map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
let mut reader = BufReader::new(stream);
reader
.get_mut()
.write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
.await?;
let mut line = String::new();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if !reply.is_ok() {
return Err(SamError::HandshakeFailed(format!(
"naming re-handshake: {}",
reply.result()
)));
}
let cmd = format!("NAMING LOOKUP NAME={name}\n");
reader.get_mut().write_all(cmd.as_bytes()).await?;
line.clear();
reader.read_line(&mut line).await?;
let reply = SamReply::parse(&line)?;
if reply.major != "NAMING" || reply.minor != "REPLY" || !reply.is_ok() {
return Err(SamError::NamingLookupFailed(format!(
"{}: {} ({})",
name,
reply.result(),
reply.message().unwrap_or("no message"),
)));
}
let dest_b64 = reply.pairs.get("VALUE").ok_or_else(|| {
SamError::NamingLookupFailed(format!("{name}: missing VALUE in reply"))
})?;
I2pDestination::from_base64(dest_b64)
.map_err(|e| SamError::InvalidDestination(format!("{name}: {e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sam_reply_parse_hello() {
let reply = SamReply::parse("HELLO REPLY RESULT=OK VERSION=3.1").unwrap();
assert_eq!(reply.major, "HELLO");
assert_eq!(reply.minor, "REPLY");
assert!(reply.is_ok());
assert_eq!(reply.pairs.get("VERSION").unwrap(), "3.1");
}
#[test]
fn sam_reply_parse_error() {
let reply =
SamReply::parse("SESSION STATUS RESULT=DUPLICATED_ID MESSAGE=\"session exists\"")
.unwrap();
assert_eq!(reply.major, "SESSION");
assert_eq!(reply.minor, "STATUS");
assert!(!reply.is_ok());
assert_eq!(reply.result(), "DUPLICATED_ID");
assert_eq!(reply.message(), Some("session exists"));
}
#[test]
fn sam_reply_parse_session_create_ok() {
let dest_b64 = super::super::destination::i2p_base64_encode(&[42u8; 516]);
let line = format!("SESSION STATUS RESULT=OK DESTINATION={dest_b64}");
let reply = SamReply::parse(&line).unwrap();
assert!(reply.is_ok());
assert_eq!(reply.pairs.get("DESTINATION").unwrap(), &dest_b64);
}
#[test]
fn sam_reply_parse_stream_status() {
let reply = SamReply::parse("STREAM STATUS RESULT=OK").unwrap();
assert_eq!(reply.major, "STREAM");
assert_eq!(reply.minor, "STATUS");
assert!(reply.is_ok());
}
#[test]
fn sam_reply_parse_naming_ok() {
let dest_b64 = super::super::destination::i2p_base64_encode(&[7u8; 400]);
let line = format!("NAMING REPLY RESULT=OK NAME=test.i2p VALUE={dest_b64}");
let reply = SamReply::parse(&line).unwrap();
assert!(reply.is_ok());
assert_eq!(reply.pairs.get("NAME").unwrap(), "test.i2p");
assert_eq!(reply.pairs.get("VALUE").unwrap(), &dest_b64);
}
#[test]
fn sam_reply_parse_naming_error() {
let reply = SamReply::parse("NAMING REPLY RESULT=KEY_NOT_FOUND NAME=unknown.i2p").unwrap();
assert!(!reply.is_ok());
assert_eq!(reply.result(), "KEY_NOT_FOUND");
}
#[test]
fn sam_reply_parse_empty_line() {
let err = SamReply::parse("").unwrap_err();
assert!(matches!(err, SamError::ProtocolError(_)));
}
#[test]
fn sam_reply_parse_single_token() {
let err = SamReply::parse("HELLO").unwrap_err();
assert!(matches!(err, SamError::ProtocolError(_)));
}
#[test]
fn parse_key_value_quoted_message() {
let mut pairs = HashMap::new();
parse_key_value_pairs(
"RESULT=I2P_ERROR MESSAGE=\"tunnel build failed\"",
&mut pairs,
);
assert_eq!(pairs.get("RESULT").unwrap(), "I2P_ERROR");
assert_eq!(pairs.get("MESSAGE").unwrap(), "tunnel build failed");
}
#[test]
fn parse_key_value_multiple_unquoted() {
let mut pairs = HashMap::new();
parse_key_value_pairs("A=1 B=hello C=world", &mut pairs);
assert_eq!(pairs.get("A").unwrap(), "1");
assert_eq!(pairs.get("B").unwrap(), "hello");
assert_eq!(pairs.get("C").unwrap(), "world");
}
#[test]
fn tunnel_config_default() {
let cfg = SamTunnelConfig::default();
assert_eq!(cfg.inbound_quantity, 3);
assert_eq!(cfg.outbound_quantity, 3);
assert_eq!(cfg.inbound_length, 3);
assert_eq!(cfg.outbound_length, 3);
}
#[test]
fn tunnel_config_to_sam_options() {
let cfg = SamTunnelConfig {
inbound_quantity: 2,
outbound_quantity: 4,
inbound_length: 1,
outbound_length: 2,
};
let opts = cfg.to_sam_options();
assert!(opts.contains("inbound.quantity=2"));
assert!(opts.contains("outbound.quantity=4"));
assert!(opts.contains("inbound.length=1"));
assert!(opts.contains("outbound.length=2"));
}
#[test]
fn sam_error_display() {
let err = SamError::HandshakeFailed("version mismatch".into());
assert!(err.to_string().contains("handshake"));
assert!(err.to_string().contains("version mismatch"));
}
#[test]
fn sam_error_from_io() {
let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "refused");
let sam_err = SamError::from(io_err);
assert!(matches!(sam_err, SamError::IoError(_)));
}
}