1use std::collections::HashMap;
14use std::fmt;
15
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::net::TcpStream;
18use tracing::{debug, info};
19
20use super::destination::I2pDestination;
21
22#[derive(Debug, Clone, thiserror::Error)]
24pub enum SamError {
25 #[error("SAM connection failed: {0}")]
27 ConnectionFailed(String),
28
29 #[error("SAM handshake failed: {0}")]
31 HandshakeFailed(String),
32
33 #[error("SAM session creation failed: {0}")]
35 SessionCreateFailed(String),
36
37 #[error("SAM stream connect failed: {0}")]
39 StreamConnectFailed(String),
40
41 #[error("SAM stream accept failed: {0}")]
43 StreamAcceptFailed(String),
44
45 #[error("SAM naming lookup failed: {0}")]
47 NamingLookupFailed(String),
48
49 #[error("SAM protocol error: {0}")]
51 ProtocolError(String),
52
53 #[error("SAM I/O error: {0}")]
55 IoError(String),
56
57 #[error("SAM invalid destination: {0}")]
59 InvalidDestination(String),
60}
61
62impl From<std::io::Error> for SamError {
63 fn from(e: std::io::Error) -> Self {
64 Self::IoError(e.to_string())
65 }
66}
67
68#[derive(Debug, Clone)]
73pub(crate) struct SamReply {
74 pub major: String,
75 pub minor: String,
76 pub pairs: HashMap<String, String>,
77}
78
79impl SamReply {
80 pub fn parse(line: &str) -> Result<Self, SamError> {
85 let line = line.trim();
86 let mut parts = line.splitn(3, ' ');
87
88 let major = parts
89 .next()
90 .filter(|s| !s.is_empty())
91 .ok_or_else(|| SamError::ProtocolError("empty reply".into()))?
92 .to_string();
93 let minor = parts
94 .next()
95 .ok_or_else(|| SamError::ProtocolError(format!("missing minor token in: {line}")))?
96 .to_string();
97
98 let mut pairs = HashMap::new();
99 if let Some(rest) = parts.next() {
100 parse_key_value_pairs(rest, &mut pairs);
101 }
102
103 Ok(Self {
104 major,
105 minor,
106 pairs,
107 })
108 }
109
110 pub fn is_ok(&self) -> bool {
112 self.pairs.get("RESULT").is_some_and(|v| v == "OK")
113 }
114
115 pub fn result(&self) -> &str {
117 self.pairs
118 .get("RESULT")
119 .map_or("UNKNOWN", std::string::String::as_str)
120 }
121
122 pub fn message(&self) -> Option<&str> {
124 self.pairs.get("MESSAGE").map(std::string::String::as_str)
125 }
126}
127
128fn parse_key_value_pairs(s: &str, pairs: &mut HashMap<String, String>) {
132 let bytes = s.as_bytes();
133 let mut i = 0;
134
135 while i < bytes.len() {
136 while i < bytes.len() && bytes[i] == b' ' {
138 i += 1;
139 }
140 if i >= bytes.len() {
141 break;
142 }
143
144 let key_start = i;
146 while i < bytes.len() && bytes[i] != b'=' && bytes[i] != b' ' {
147 i += 1;
148 }
149 if i >= bytes.len() || bytes[i] != b'=' {
150 while i < bytes.len() && bytes[i] != b' ' {
152 i += 1;
153 }
154 continue;
155 }
156 let key = String::from_utf8_lossy(&bytes[key_start..i]).to_string();
157 i += 1; let value = if i < bytes.len() && bytes[i] == b'"' {
161 i += 1; let val_start = i;
163 while i < bytes.len() && bytes[i] != b'"' {
164 i += 1;
165 }
166 let val = String::from_utf8_lossy(&bytes[val_start..i]).to_string();
167 if i < bytes.len() {
168 i += 1; }
170 val
171 } else {
172 let val_start = i;
173 while i < bytes.len() && bytes[i] != b' ' {
174 i += 1;
175 }
176 String::from_utf8_lossy(&bytes[val_start..i]).to_string()
177 };
178
179 pairs.insert(key, value);
180 }
181}
182
183#[derive(Debug, Clone)]
185pub struct SamTunnelConfig {
186 pub inbound_quantity: u8,
188 pub outbound_quantity: u8,
190 pub inbound_length: u8,
192 pub outbound_length: u8,
194}
195
196impl Default for SamTunnelConfig {
197 fn default() -> Self {
198 Self {
199 inbound_quantity: 3,
200 outbound_quantity: 3,
201 inbound_length: 3,
202 outbound_length: 3,
203 }
204 }
205}
206
207impl SamTunnelConfig {
208 fn to_sam_options(&self) -> String {
210 format!(
211 "inbound.quantity={} outbound.quantity={} inbound.length={} outbound.length={}",
212 self.inbound_quantity,
213 self.outbound_quantity,
214 self.inbound_length,
215 self.outbound_length,
216 )
217 }
218}
219
220pub struct SamSession {
228 sam_host: String,
230 sam_port: u16,
232 destination: I2pDestination,
234 session_id: String,
236 _control_stream: TcpStream,
242 #[allow(dead_code)]
244 tunnel_config: SamTunnelConfig,
245}
246
247#[allow(
248 clippy::missing_fields_in_debug,
249 reason = "intentionally omit internal channel fields from Debug output"
250)]
251impl fmt::Debug for SamSession {
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 f.debug_struct("SamSession")
254 .field("session_id", &self.session_id)
255 .field("destination", &self.destination)
256 .finish()
257 }
258}
259
260pub struct SamStream {
262 stream: TcpStream,
264 remote_destination: I2pDestination,
266}
267
268impl SamStream {
269 pub fn remote_destination(&self) -> &I2pDestination {
271 &self.remote_destination
272 }
273
274 pub fn into_inner(self) -> TcpStream {
279 self.stream
280 }
281
282 pub fn inner(&self) -> &TcpStream {
284 &self.stream
285 }
286
287 pub fn inner_mut(&mut self) -> &mut TcpStream {
289 &mut self.stream
290 }
291}
292
293impl SamSession {
294 pub async fn create(
308 host: &str,
309 port: u16,
310 session_id: &str,
311 tunnel_config: SamTunnelConfig,
312 ) -> Result<Self, SamError> {
313 let addr = format!("{host}:{port}");
314 let stream = TcpStream::connect(&addr)
315 .await
316 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
317
318 let mut reader = BufReader::new(stream);
319
320 let hello_cmd = "HELLO VERSION MIN=3.1 MAX=3.1\n";
322 reader.get_mut().write_all(hello_cmd.as_bytes()).await?;
323
324 let mut line = String::new();
325 reader.read_line(&mut line).await?;
326 let reply = SamReply::parse(&line)?;
327
328 if reply.major != "HELLO" || reply.minor != "REPLY" || !reply.is_ok() {
329 return Err(SamError::HandshakeFailed(format!(
330 "unexpected reply: {} ({})",
331 reply.result(),
332 reply.message().unwrap_or("no message"),
333 )));
334 }
335
336 let version = reply.pairs.get("VERSION").cloned().unwrap_or_default();
337 debug!("SAM handshake OK, version {version}");
338
339 let tunnel_opts = tunnel_config.to_sam_options();
341 let session_cmd = format!(
342 "SESSION CREATE STYLE=STREAM ID={session_id} DESTINATION=TRANSIENT {tunnel_opts}\n"
343 );
344 reader.get_mut().write_all(session_cmd.as_bytes()).await?;
345
346 line.clear();
347 reader.read_line(&mut line).await?;
348 let reply = SamReply::parse(&line)?;
349
350 if reply.major != "SESSION" || reply.minor != "STATUS" || !reply.is_ok() {
351 return Err(SamError::SessionCreateFailed(format!(
352 "{} ({})",
353 reply.result(),
354 reply.message().unwrap_or("no message"),
355 )));
356 }
357
358 let dest_b64 = reply
360 .pairs
361 .get("DESTINATION")
362 .ok_or_else(|| SamError::SessionCreateFailed("missing DESTINATION in reply".into()))?;
363
364 let destination = I2pDestination::from_base64(dest_b64).map_err(|e| {
365 SamError::InvalidDestination(format!("bad destination in SESSION STATUS: {e}"))
366 })?;
367
368 info!(
369 session_id = session_id,
370 dest_len = destination.len(),
371 "SAM session created"
372 );
373
374 let control_stream = reader.into_inner();
377
378 Ok(Self {
379 destination,
380 session_id: session_id.to_string(),
381 sam_host: host.to_string(),
382 sam_port: port,
383 tunnel_config,
384 _control_stream: control_stream,
385 })
386 }
387
388 pub fn destination(&self) -> &I2pDestination {
390 &self.destination
391 }
392
393 pub fn session_id(&self) -> &str {
395 &self.session_id
396 }
397
398 pub async fn connect(&self, dest: &I2pDestination) -> Result<SamStream, SamError> {
410 let addr = format!("{}:{}", self.sam_host, self.sam_port);
411 let stream = TcpStream::connect(&addr)
412 .await
413 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
414
415 let mut reader = BufReader::new(stream);
416
417 reader
419 .get_mut()
420 .write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
421 .await?;
422 let mut line = String::new();
423 reader.read_line(&mut line).await?;
424 let reply = SamReply::parse(&line)?;
425 if !reply.is_ok() {
426 return Err(SamError::HandshakeFailed(format!(
427 "connect re-handshake: {}",
428 reply.result()
429 )));
430 }
431
432 let dest_b64 = dest.to_base64();
434 let cmd = format!(
435 "STREAM CONNECT ID={} DESTINATION={} SILENT=false\n",
436 self.session_id, dest_b64,
437 );
438 reader.get_mut().write_all(cmd.as_bytes()).await?;
439
440 line.clear();
441 reader.read_line(&mut line).await?;
442 let reply = SamReply::parse(&line)?;
443
444 if reply.major != "STREAM" || reply.minor != "STATUS" || !reply.is_ok() {
445 return Err(SamError::StreamConnectFailed(format!(
446 "{} ({})",
447 reply.result(),
448 reply.message().unwrap_or("no message"),
449 )));
450 }
451
452 debug!(dest = %dest, "SAM stream connected");
453
454 let stream = reader.into_inner();
456 Ok(SamStream {
457 stream,
458 remote_destination: dest.clone(),
459 })
460 }
461
462 pub async fn accept(&self) -> Result<SamStream, SamError> {
474 let addr = format!("{}:{}", self.sam_host, self.sam_port);
475 let stream = TcpStream::connect(&addr)
476 .await
477 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
478
479 let mut reader = BufReader::new(stream);
480
481 reader
483 .get_mut()
484 .write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
485 .await?;
486 let mut line = String::new();
487 reader.read_line(&mut line).await?;
488 let reply = SamReply::parse(&line)?;
489 if !reply.is_ok() {
490 return Err(SamError::HandshakeFailed(format!(
491 "accept re-handshake: {}",
492 reply.result()
493 )));
494 }
495
496 let cmd = format!("STREAM ACCEPT ID={} SILENT=false\n", self.session_id);
498 reader.get_mut().write_all(cmd.as_bytes()).await?;
499
500 line.clear();
501 reader.read_line(&mut line).await?;
502 let reply = SamReply::parse(&line)?;
503
504 if reply.major != "STREAM" || reply.minor != "STATUS" || !reply.is_ok() {
505 return Err(SamError::StreamAcceptFailed(format!(
506 "{} ({})",
507 reply.result(),
508 reply.message().unwrap_or("no message"),
509 )));
510 }
511
512 line.clear();
514 reader.read_line(&mut line).await?;
515 let remote_dest_b64 = line.trim();
516
517 let remote_destination = I2pDestination::from_base64(remote_dest_b64)
518 .map_err(|e| SamError::InvalidDestination(format!("incoming destination: {e}")))?;
519
520 debug!(remote = %remote_destination, "SAM stream accepted");
521
522 let stream = reader.into_inner();
523 Ok(SamStream {
524 stream,
525 remote_destination,
526 })
527 }
528
529 pub async fn naming_lookup(&self, name: &str) -> Result<I2pDestination, SamError> {
538 let addr = format!("{}:{}", self.sam_host, self.sam_port);
539 let stream = TcpStream::connect(&addr)
540 .await
541 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
542
543 let mut reader = BufReader::new(stream);
544
545 reader
547 .get_mut()
548 .write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
549 .await?;
550 let mut line = String::new();
551 reader.read_line(&mut line).await?;
552 let reply = SamReply::parse(&line)?;
553 if !reply.is_ok() {
554 return Err(SamError::HandshakeFailed(format!(
555 "naming re-handshake: {}",
556 reply.result()
557 )));
558 }
559
560 let cmd = format!("NAMING LOOKUP NAME={name}\n");
562 reader.get_mut().write_all(cmd.as_bytes()).await?;
563
564 line.clear();
565 reader.read_line(&mut line).await?;
566 let reply = SamReply::parse(&line)?;
567
568 if reply.major != "NAMING" || reply.minor != "REPLY" || !reply.is_ok() {
569 return Err(SamError::NamingLookupFailed(format!(
570 "{}: {} ({})",
571 name,
572 reply.result(),
573 reply.message().unwrap_or("no message"),
574 )));
575 }
576
577 let dest_b64 = reply.pairs.get("VALUE").ok_or_else(|| {
578 SamError::NamingLookupFailed(format!("{name}: missing VALUE in reply"))
579 })?;
580
581 I2pDestination::from_base64(dest_b64)
582 .map_err(|e| SamError::InvalidDestination(format!("{name}: {e}")))
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[test]
591 fn sam_reply_parse_hello() {
592 let reply = SamReply::parse("HELLO REPLY RESULT=OK VERSION=3.1").unwrap();
593 assert_eq!(reply.major, "HELLO");
594 assert_eq!(reply.minor, "REPLY");
595 assert!(reply.is_ok());
596 assert_eq!(reply.pairs.get("VERSION").unwrap(), "3.1");
597 }
598
599 #[test]
600 fn sam_reply_parse_error() {
601 let reply =
602 SamReply::parse("SESSION STATUS RESULT=DUPLICATED_ID MESSAGE=\"session exists\"")
603 .unwrap();
604 assert_eq!(reply.major, "SESSION");
605 assert_eq!(reply.minor, "STATUS");
606 assert!(!reply.is_ok());
607 assert_eq!(reply.result(), "DUPLICATED_ID");
608 assert_eq!(reply.message(), Some("session exists"));
609 }
610
611 #[test]
612 fn sam_reply_parse_session_create_ok() {
613 let dest_b64 = super::super::destination::i2p_base64_encode(&[42u8; 516]);
614 let line = format!("SESSION STATUS RESULT=OK DESTINATION={dest_b64}");
615 let reply = SamReply::parse(&line).unwrap();
616 assert!(reply.is_ok());
617 assert_eq!(reply.pairs.get("DESTINATION").unwrap(), &dest_b64);
618 }
619
620 #[test]
621 fn sam_reply_parse_stream_status() {
622 let reply = SamReply::parse("STREAM STATUS RESULT=OK").unwrap();
623 assert_eq!(reply.major, "STREAM");
624 assert_eq!(reply.minor, "STATUS");
625 assert!(reply.is_ok());
626 }
627
628 #[test]
629 fn sam_reply_parse_naming_ok() {
630 let dest_b64 = super::super::destination::i2p_base64_encode(&[7u8; 400]);
631 let line = format!("NAMING REPLY RESULT=OK NAME=test.i2p VALUE={dest_b64}");
632 let reply = SamReply::parse(&line).unwrap();
633 assert!(reply.is_ok());
634 assert_eq!(reply.pairs.get("NAME").unwrap(), "test.i2p");
635 assert_eq!(reply.pairs.get("VALUE").unwrap(), &dest_b64);
636 }
637
638 #[test]
639 fn sam_reply_parse_naming_error() {
640 let reply = SamReply::parse("NAMING REPLY RESULT=KEY_NOT_FOUND NAME=unknown.i2p").unwrap();
641 assert!(!reply.is_ok());
642 assert_eq!(reply.result(), "KEY_NOT_FOUND");
643 }
644
645 #[test]
646 fn sam_reply_parse_empty_line() {
647 let err = SamReply::parse("").unwrap_err();
648 assert!(matches!(err, SamError::ProtocolError(_)));
649 }
650
651 #[test]
652 fn sam_reply_parse_single_token() {
653 let err = SamReply::parse("HELLO").unwrap_err();
654 assert!(matches!(err, SamError::ProtocolError(_)));
655 }
656
657 #[test]
658 fn parse_key_value_quoted_message() {
659 let mut pairs = HashMap::new();
660 parse_key_value_pairs(
661 "RESULT=I2P_ERROR MESSAGE=\"tunnel build failed\"",
662 &mut pairs,
663 );
664 assert_eq!(pairs.get("RESULT").unwrap(), "I2P_ERROR");
665 assert_eq!(pairs.get("MESSAGE").unwrap(), "tunnel build failed");
666 }
667
668 #[test]
669 fn parse_key_value_multiple_unquoted() {
670 let mut pairs = HashMap::new();
671 parse_key_value_pairs("A=1 B=hello C=world", &mut pairs);
672 assert_eq!(pairs.get("A").unwrap(), "1");
673 assert_eq!(pairs.get("B").unwrap(), "hello");
674 assert_eq!(pairs.get("C").unwrap(), "world");
675 }
676
677 #[test]
678 fn tunnel_config_default() {
679 let cfg = SamTunnelConfig::default();
680 assert_eq!(cfg.inbound_quantity, 3);
681 assert_eq!(cfg.outbound_quantity, 3);
682 assert_eq!(cfg.inbound_length, 3);
683 assert_eq!(cfg.outbound_length, 3);
684 }
685
686 #[test]
687 fn tunnel_config_to_sam_options() {
688 let cfg = SamTunnelConfig {
689 inbound_quantity: 2,
690 outbound_quantity: 4,
691 inbound_length: 1,
692 outbound_length: 2,
693 };
694 let opts = cfg.to_sam_options();
695 assert!(opts.contains("inbound.quantity=2"));
696 assert!(opts.contains("outbound.quantity=4"));
697 assert!(opts.contains("inbound.length=1"));
698 assert!(opts.contains("outbound.length=2"));
699 }
700
701 #[test]
702 fn sam_error_display() {
703 let err = SamError::HandshakeFailed("version mismatch".into());
704 assert!(err.to_string().contains("handshake"));
705 assert!(err.to_string().contains("version mismatch"));
706 }
707
708 #[test]
709 fn sam_error_from_io() {
710 let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "refused");
711 let sam_err = SamError::from(io_err);
712 assert!(matches!(sam_err, SamError::IoError(_)));
713 }
714}