1use std::collections::HashMap;
14use std::fmt;
15
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::net::TcpStream;
18use tracing::{debug, info};
19
20use super::destination::I2pDestination;
21
22#[derive(Debug, Clone, thiserror::Error)]
24pub enum SamError {
25 #[error("SAM connection failed: {0}")]
27 ConnectionFailed(String),
28
29 #[error("SAM handshake failed: {0}")]
31 HandshakeFailed(String),
32
33 #[error("SAM session creation failed: {0}")]
35 SessionCreateFailed(String),
36
37 #[error("SAM stream connect failed: {0}")]
39 StreamConnectFailed(String),
40
41 #[error("SAM stream accept failed: {0}")]
43 StreamAcceptFailed(String),
44
45 #[error("SAM naming lookup failed: {0}")]
47 NamingLookupFailed(String),
48
49 #[error("SAM protocol error: {0}")]
51 ProtocolError(String),
52
53 #[error("SAM I/O error: {0}")]
55 IoError(String),
56
57 #[error("SAM invalid destination: {0}")]
59 InvalidDestination(String),
60}
61
62impl From<std::io::Error> for SamError {
63 fn from(e: std::io::Error) -> Self {
64 SamError::IoError(e.to_string())
65 }
66}
67
68#[derive(Debug, Clone)]
73pub(crate) struct SamReply {
74 pub major: String,
75 pub minor: String,
76 pub pairs: HashMap<String, String>,
77}
78
79impl SamReply {
80 pub fn parse(line: &str) -> Result<Self, SamError> {
85 let line = line.trim();
86 let mut parts = line.splitn(3, ' ');
87
88 let major = parts
89 .next()
90 .filter(|s| !s.is_empty())
91 .ok_or_else(|| SamError::ProtocolError("empty reply".into()))?
92 .to_string();
93 let minor = parts
94 .next()
95 .ok_or_else(|| SamError::ProtocolError(format!("missing minor token in: {line}")))?
96 .to_string();
97
98 let mut pairs = HashMap::new();
99 if let Some(rest) = parts.next() {
100 parse_key_value_pairs(rest, &mut pairs);
101 }
102
103 Ok(SamReply {
104 major,
105 minor,
106 pairs,
107 })
108 }
109
110 pub fn is_ok(&self) -> bool {
112 self.pairs.get("RESULT").is_some_and(|v| v == "OK")
113 }
114
115 pub fn result(&self) -> &str {
117 self.pairs
118 .get("RESULT")
119 .map(|s| s.as_str())
120 .unwrap_or("UNKNOWN")
121 }
122
123 pub fn message(&self) -> Option<&str> {
125 self.pairs.get("MESSAGE").map(|s| s.as_str())
126 }
127}
128
129fn parse_key_value_pairs(s: &str, pairs: &mut HashMap<String, String>) {
133 let bytes = s.as_bytes();
134 let mut i = 0;
135
136 while i < bytes.len() {
137 while i < bytes.len() && bytes[i] == b' ' {
139 i += 1;
140 }
141 if i >= bytes.len() {
142 break;
143 }
144
145 let key_start = i;
147 while i < bytes.len() && bytes[i] != b'=' && bytes[i] != b' ' {
148 i += 1;
149 }
150 if i >= bytes.len() || bytes[i] != b'=' {
151 while i < bytes.len() && bytes[i] != b' ' {
153 i += 1;
154 }
155 continue;
156 }
157 let key = String::from_utf8_lossy(&bytes[key_start..i]).to_string();
158 i += 1; let value = if i < bytes.len() && bytes[i] == b'"' {
162 i += 1; let val_start = i;
164 while i < bytes.len() && bytes[i] != b'"' {
165 i += 1;
166 }
167 let val = String::from_utf8_lossy(&bytes[val_start..i]).to_string();
168 if i < bytes.len() {
169 i += 1; }
171 val
172 } else {
173 let val_start = i;
174 while i < bytes.len() && bytes[i] != b' ' {
175 i += 1;
176 }
177 String::from_utf8_lossy(&bytes[val_start..i]).to_string()
178 };
179
180 pairs.insert(key, value);
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct SamTunnelConfig {
187 pub inbound_quantity: u8,
189 pub outbound_quantity: u8,
191 pub inbound_length: u8,
193 pub outbound_length: u8,
195}
196
197impl Default for SamTunnelConfig {
198 fn default() -> Self {
199 Self {
200 inbound_quantity: 3,
201 outbound_quantity: 3,
202 inbound_length: 3,
203 outbound_length: 3,
204 }
205 }
206}
207
208impl SamTunnelConfig {
209 fn to_sam_options(&self) -> String {
211 format!(
212 "inbound.quantity={} outbound.quantity={} inbound.length={} outbound.length={}",
213 self.inbound_quantity,
214 self.outbound_quantity,
215 self.inbound_length,
216 self.outbound_length,
217 )
218 }
219}
220
221pub struct SamSession {
229 sam_host: String,
231 sam_port: u16,
233 destination: I2pDestination,
235 session_id: String,
237 _control_stream: TcpStream,
243 #[allow(dead_code)]
245 tunnel_config: SamTunnelConfig,
246}
247
248impl fmt::Debug for SamSession {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 f.debug_struct("SamSession")
251 .field("session_id", &self.session_id)
252 .field("destination", &self.destination)
253 .finish()
254 }
255}
256
257pub struct SamStream {
259 stream: TcpStream,
261 remote_destination: I2pDestination,
263}
264
265impl SamStream {
266 pub fn remote_destination(&self) -> &I2pDestination {
268 &self.remote_destination
269 }
270
271 pub fn into_inner(self) -> TcpStream {
276 self.stream
277 }
278
279 pub fn inner(&self) -> &TcpStream {
281 &self.stream
282 }
283
284 pub fn inner_mut(&mut self) -> &mut TcpStream {
286 &mut self.stream
287 }
288}
289
290impl SamSession {
291 pub async fn create(
301 host: &str,
302 port: u16,
303 session_id: &str,
304 tunnel_config: SamTunnelConfig,
305 ) -> Result<Self, SamError> {
306 let addr = format!("{host}:{port}");
307 let stream = TcpStream::connect(&addr)
308 .await
309 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
310
311 let mut reader = BufReader::new(stream);
312
313 let hello_cmd = "HELLO VERSION MIN=3.1 MAX=3.1\n";
315 reader.get_mut().write_all(hello_cmd.as_bytes()).await?;
316
317 let mut line = String::new();
318 reader.read_line(&mut line).await?;
319 let reply = SamReply::parse(&line)?;
320
321 if reply.major != "HELLO" || reply.minor != "REPLY" || !reply.is_ok() {
322 return Err(SamError::HandshakeFailed(format!(
323 "unexpected reply: {} ({})",
324 reply.result(),
325 reply.message().unwrap_or("no message"),
326 )));
327 }
328
329 let version = reply.pairs.get("VERSION").cloned().unwrap_or_default();
330 debug!("SAM handshake OK, version {version}");
331
332 let tunnel_opts = tunnel_config.to_sam_options();
334 let session_cmd = format!(
335 "SESSION CREATE STYLE=STREAM ID={session_id} DESTINATION=TRANSIENT {tunnel_opts}\n"
336 );
337 reader.get_mut().write_all(session_cmd.as_bytes()).await?;
338
339 line.clear();
340 reader.read_line(&mut line).await?;
341 let reply = SamReply::parse(&line)?;
342
343 if reply.major != "SESSION" || reply.minor != "STATUS" || !reply.is_ok() {
344 return Err(SamError::SessionCreateFailed(format!(
345 "{} ({})",
346 reply.result(),
347 reply.message().unwrap_or("no message"),
348 )));
349 }
350
351 let dest_b64 = reply
353 .pairs
354 .get("DESTINATION")
355 .ok_or_else(|| SamError::SessionCreateFailed("missing DESTINATION in reply".into()))?;
356
357 let destination = I2pDestination::from_base64(dest_b64).map_err(|e| {
358 SamError::InvalidDestination(format!("bad destination in SESSION STATUS: {e}"))
359 })?;
360
361 info!(
362 session_id = session_id,
363 dest_len = destination.len(),
364 "SAM session created"
365 );
366
367 let control_stream = reader.into_inner();
370
371 Ok(SamSession {
372 destination,
373 session_id: session_id.to_string(),
374 sam_host: host.to_string(),
375 sam_port: port,
376 tunnel_config,
377 _control_stream: control_stream,
378 })
379 }
380
381 pub fn destination(&self) -> &I2pDestination {
383 &self.destination
384 }
385
386 pub fn session_id(&self) -> &str {
388 &self.session_id
389 }
390
391 pub async fn connect(&self, dest: &I2pDestination) -> Result<SamStream, SamError> {
399 let addr = format!("{}:{}", self.sam_host, self.sam_port);
400 let stream = TcpStream::connect(&addr)
401 .await
402 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
403
404 let mut reader = BufReader::new(stream);
405
406 reader
408 .get_mut()
409 .write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
410 .await?;
411 let mut line = String::new();
412 reader.read_line(&mut line).await?;
413 let reply = SamReply::parse(&line)?;
414 if !reply.is_ok() {
415 return Err(SamError::HandshakeFailed(format!(
416 "connect re-handshake: {}",
417 reply.result()
418 )));
419 }
420
421 let dest_b64 = dest.to_base64();
423 let cmd = format!(
424 "STREAM CONNECT ID={} DESTINATION={} SILENT=false\n",
425 self.session_id, dest_b64,
426 );
427 reader.get_mut().write_all(cmd.as_bytes()).await?;
428
429 line.clear();
430 reader.read_line(&mut line).await?;
431 let reply = SamReply::parse(&line)?;
432
433 if reply.major != "STREAM" || reply.minor != "STATUS" || !reply.is_ok() {
434 return Err(SamError::StreamConnectFailed(format!(
435 "{} ({})",
436 reply.result(),
437 reply.message().unwrap_or("no message"),
438 )));
439 }
440
441 debug!(dest = %dest, "SAM stream connected");
442
443 let stream = reader.into_inner();
445 Ok(SamStream {
446 stream,
447 remote_destination: dest.clone(),
448 })
449 }
450
451 pub async fn accept(&self) -> Result<SamStream, SamError> {
459 let addr = format!("{}:{}", self.sam_host, self.sam_port);
460 let stream = TcpStream::connect(&addr)
461 .await
462 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
463
464 let mut reader = BufReader::new(stream);
465
466 reader
468 .get_mut()
469 .write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
470 .await?;
471 let mut line = String::new();
472 reader.read_line(&mut line).await?;
473 let reply = SamReply::parse(&line)?;
474 if !reply.is_ok() {
475 return Err(SamError::HandshakeFailed(format!(
476 "accept re-handshake: {}",
477 reply.result()
478 )));
479 }
480
481 let cmd = format!("STREAM ACCEPT ID={} SILENT=false\n", self.session_id);
483 reader.get_mut().write_all(cmd.as_bytes()).await?;
484
485 line.clear();
486 reader.read_line(&mut line).await?;
487 let reply = SamReply::parse(&line)?;
488
489 if reply.major != "STREAM" || reply.minor != "STATUS" || !reply.is_ok() {
490 return Err(SamError::StreamAcceptFailed(format!(
491 "{} ({})",
492 reply.result(),
493 reply.message().unwrap_or("no message"),
494 )));
495 }
496
497 line.clear();
499 reader.read_line(&mut line).await?;
500 let remote_dest_b64 = line.trim();
501
502 let remote_destination = I2pDestination::from_base64(remote_dest_b64)
503 .map_err(|e| SamError::InvalidDestination(format!("incoming destination: {e}")))?;
504
505 debug!(remote = %remote_destination, "SAM stream accepted");
506
507 let stream = reader.into_inner();
508 Ok(SamStream {
509 stream,
510 remote_destination,
511 })
512 }
513
514 pub async fn naming_lookup(&self, name: &str) -> Result<I2pDestination, SamError> {
519 let addr = format!("{}:{}", self.sam_host, self.sam_port);
520 let stream = TcpStream::connect(&addr)
521 .await
522 .map_err(|e| SamError::ConnectionFailed(format!("{addr}: {e}")))?;
523
524 let mut reader = BufReader::new(stream);
525
526 reader
528 .get_mut()
529 .write_all(b"HELLO VERSION MIN=3.1 MAX=3.1\n")
530 .await?;
531 let mut line = String::new();
532 reader.read_line(&mut line).await?;
533 let reply = SamReply::parse(&line)?;
534 if !reply.is_ok() {
535 return Err(SamError::HandshakeFailed(format!(
536 "naming re-handshake: {}",
537 reply.result()
538 )));
539 }
540
541 let cmd = format!("NAMING LOOKUP NAME={name}\n");
543 reader.get_mut().write_all(cmd.as_bytes()).await?;
544
545 line.clear();
546 reader.read_line(&mut line).await?;
547 let reply = SamReply::parse(&line)?;
548
549 if reply.major != "NAMING" || reply.minor != "REPLY" || !reply.is_ok() {
550 return Err(SamError::NamingLookupFailed(format!(
551 "{}: {} ({})",
552 name,
553 reply.result(),
554 reply.message().unwrap_or("no message"),
555 )));
556 }
557
558 let dest_b64 = reply.pairs.get("VALUE").ok_or_else(|| {
559 SamError::NamingLookupFailed(format!("{name}: missing VALUE in reply"))
560 })?;
561
562 I2pDestination::from_base64(dest_b64)
563 .map_err(|e| SamError::InvalidDestination(format!("{name}: {e}")))
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn sam_reply_parse_hello() {
573 let reply = SamReply::parse("HELLO REPLY RESULT=OK VERSION=3.1").unwrap();
574 assert_eq!(reply.major, "HELLO");
575 assert_eq!(reply.minor, "REPLY");
576 assert!(reply.is_ok());
577 assert_eq!(reply.pairs.get("VERSION").unwrap(), "3.1");
578 }
579
580 #[test]
581 fn sam_reply_parse_error() {
582 let reply =
583 SamReply::parse("SESSION STATUS RESULT=DUPLICATED_ID MESSAGE=\"session exists\"")
584 .unwrap();
585 assert_eq!(reply.major, "SESSION");
586 assert_eq!(reply.minor, "STATUS");
587 assert!(!reply.is_ok());
588 assert_eq!(reply.result(), "DUPLICATED_ID");
589 assert_eq!(reply.message(), Some("session exists"));
590 }
591
592 #[test]
593 fn sam_reply_parse_session_create_ok() {
594 let dest_b64 = super::super::destination::i2p_base64_encode(&[42u8; 516]);
595 let line = format!("SESSION STATUS RESULT=OK DESTINATION={dest_b64}");
596 let reply = SamReply::parse(&line).unwrap();
597 assert!(reply.is_ok());
598 assert_eq!(reply.pairs.get("DESTINATION").unwrap(), &dest_b64);
599 }
600
601 #[test]
602 fn sam_reply_parse_stream_status() {
603 let reply = SamReply::parse("STREAM STATUS RESULT=OK").unwrap();
604 assert_eq!(reply.major, "STREAM");
605 assert_eq!(reply.minor, "STATUS");
606 assert!(reply.is_ok());
607 }
608
609 #[test]
610 fn sam_reply_parse_naming_ok() {
611 let dest_b64 = super::super::destination::i2p_base64_encode(&[7u8; 400]);
612 let line = format!("NAMING REPLY RESULT=OK NAME=test.i2p VALUE={dest_b64}");
613 let reply = SamReply::parse(&line).unwrap();
614 assert!(reply.is_ok());
615 assert_eq!(reply.pairs.get("NAME").unwrap(), "test.i2p");
616 assert_eq!(reply.pairs.get("VALUE").unwrap(), &dest_b64);
617 }
618
619 #[test]
620 fn sam_reply_parse_naming_error() {
621 let reply = SamReply::parse("NAMING REPLY RESULT=KEY_NOT_FOUND NAME=unknown.i2p").unwrap();
622 assert!(!reply.is_ok());
623 assert_eq!(reply.result(), "KEY_NOT_FOUND");
624 }
625
626 #[test]
627 fn sam_reply_parse_empty_line() {
628 let err = SamReply::parse("").unwrap_err();
629 assert!(matches!(err, SamError::ProtocolError(_)));
630 }
631
632 #[test]
633 fn sam_reply_parse_single_token() {
634 let err = SamReply::parse("HELLO").unwrap_err();
635 assert!(matches!(err, SamError::ProtocolError(_)));
636 }
637
638 #[test]
639 fn parse_key_value_quoted_message() {
640 let mut pairs = HashMap::new();
641 parse_key_value_pairs(
642 "RESULT=I2P_ERROR MESSAGE=\"tunnel build failed\"",
643 &mut pairs,
644 );
645 assert_eq!(pairs.get("RESULT").unwrap(), "I2P_ERROR");
646 assert_eq!(pairs.get("MESSAGE").unwrap(), "tunnel build failed");
647 }
648
649 #[test]
650 fn parse_key_value_multiple_unquoted() {
651 let mut pairs = HashMap::new();
652 parse_key_value_pairs("A=1 B=hello C=world", &mut pairs);
653 assert_eq!(pairs.get("A").unwrap(), "1");
654 assert_eq!(pairs.get("B").unwrap(), "hello");
655 assert_eq!(pairs.get("C").unwrap(), "world");
656 }
657
658 #[test]
659 fn tunnel_config_default() {
660 let cfg = SamTunnelConfig::default();
661 assert_eq!(cfg.inbound_quantity, 3);
662 assert_eq!(cfg.outbound_quantity, 3);
663 assert_eq!(cfg.inbound_length, 3);
664 assert_eq!(cfg.outbound_length, 3);
665 }
666
667 #[test]
668 fn tunnel_config_to_sam_options() {
669 let cfg = SamTunnelConfig {
670 inbound_quantity: 2,
671 outbound_quantity: 4,
672 inbound_length: 1,
673 outbound_length: 2,
674 };
675 let opts = cfg.to_sam_options();
676 assert!(opts.contains("inbound.quantity=2"));
677 assert!(opts.contains("outbound.quantity=4"));
678 assert!(opts.contains("inbound.length=1"));
679 assert!(opts.contains("outbound.length=2"));
680 }
681
682 #[test]
683 fn sam_error_display() {
684 let err = SamError::HandshakeFailed("version mismatch".into());
685 assert!(err.to_string().contains("handshake"));
686 assert!(err.to_string().contains("version mismatch"));
687 }
688
689 #[test]
690 fn sam_error_from_io() {
691 let io_err = std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "refused");
692 let sam_err = SamError::from(io_err);
693 assert!(matches!(sam_err, SamError::IoError(_)));
694 }
695}