1use dashmap::DashMap;
49use parking_lot::RwLock;
50use std::collections::HashMap;
51use std::net::SocketAddr;
52use std::path::PathBuf;
53use std::sync::Arc;
54use std::time::{Duration, Instant};
55use thiserror::Error;
56use tracing::{debug, info};
57
58#[derive(Debug, Error)]
60pub enum TorError {
61 #[error("Tor not running")]
62 NotRunning,
63
64 #[error("Tor already running")]
65 AlreadyRunning,
66
67 #[error("Connection failed: {0}")]
68 ConnectionFailed(String),
69
70 #[error("Hidden service creation failed: {0}")]
71 HiddenServiceFailed(String),
72
73 #[error("Circuit creation failed: {0}")]
74 CircuitFailed(String),
75
76 #[error("Invalid onion address: {0}")]
77 InvalidOnionAddress(String),
78
79 #[error("SOCKS5 proxy error: {0}")]
80 Socks5Error(String),
81
82 #[error("Tor configuration error: {0}")]
83 ConfigError(String),
84
85 #[error("IO error: {0}")]
86 Io(#[from] std::io::Error),
87}
88
89pub type Result<T> = std::result::Result<T, TorError>;
91
92pub type CircuitId = u32;
94
95pub type StreamId = u64;
97
98pub type OnionAddress = String;
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum CircuitState {
104 Building,
106
107 Ready,
109
110 Active,
112
113 Degraded,
115
116 Failed,
118
119 Closing,
121}
122
123#[derive(Debug, Clone)]
125pub struct CircuitInfo {
126 pub id: CircuitId,
128
129 pub state: CircuitState,
131
132 pub hops: Vec<String>,
134
135 pub created_at: Instant,
137
138 pub stream_count: usize,
140
141 pub bytes_sent: u64,
143
144 pub bytes_received: u64,
146}
147
148#[derive(Debug, Clone)]
150pub struct HiddenServiceConfig {
151 pub local_port: u16,
153
154 pub virtual_port: u16,
156
157 pub data_dir: PathBuf,
159
160 pub max_connections: usize,
162
163 pub use_v3: bool,
165}
166
167impl Default for HiddenServiceConfig {
168 fn default() -> Self {
169 Self {
170 local_port: 8080,
171 virtual_port: 8080,
172 data_dir: PathBuf::from("/tmp/tor-hidden-service"),
173 max_connections: 100,
174 use_v3: true,
175 }
176 }
177}
178
179#[derive(Debug, Clone)]
181pub struct TorConfig {
182 pub socks_proxy: SocketAddr,
184
185 pub control_port: SocketAddr,
187
188 pub data_dir: PathBuf,
190
191 pub stream_isolation: bool,
193
194 pub max_circuits: usize,
196
197 pub circuit_timeout: Duration,
199
200 pub enable_bandwidth_limit: bool,
202
203 pub max_bandwidth_bps: u64,
205
206 pub use_bridges: bool,
208
209 pub bridges: Vec<String>,
211}
212
213impl Default for TorConfig {
214 fn default() -> Self {
215 Self {
216 socks_proxy: "127.0.0.1:9050".parse().unwrap(),
217 control_port: "127.0.0.1:9051".parse().unwrap(),
218 data_dir: PathBuf::from("/tmp/tor-data"),
219 stream_isolation: true,
220 max_circuits: 10,
221 circuit_timeout: Duration::from_secs(60),
222 enable_bandwidth_limit: false,
223 max_bandwidth_bps: 0,
224 use_bridges: false,
225 bridges: Vec::new(),
226 }
227 }
228}
229
230impl TorConfig {
231 pub fn high_privacy() -> Self {
233 Self {
234 stream_isolation: true,
235 max_circuits: 5,
236 circuit_timeout: Duration::from_secs(90),
237 enable_bandwidth_limit: true,
238 max_bandwidth_bps: 1_000_000, ..Default::default()
240 }
241 }
242
243 pub fn high_performance() -> Self {
245 Self {
246 stream_isolation: false,
247 max_circuits: 20,
248 circuit_timeout: Duration::from_secs(30),
249 enable_bandwidth_limit: false,
250 max_bandwidth_bps: 0,
251 ..Default::default()
252 }
253 }
254
255 pub fn censorship_resistant() -> Self {
257 Self {
258 use_bridges: true,
259 bridges: vec![
260 "obfs4 192.0.2.1:443".to_string(),
262 "obfs4 192.0.2.2:443".to_string(),
263 ],
264 stream_isolation: true,
265 max_circuits: 8,
266 circuit_timeout: Duration::from_secs(120),
267 ..Default::default()
268 }
269 }
270}
271
272#[derive(Debug, Clone, Default)]
274pub struct TorStats {
275 pub circuits_created: usize,
277
278 pub active_circuits: usize,
280
281 pub streams_created: usize,
283
284 pub active_streams: usize,
286
287 pub total_bytes_sent: u64,
289
290 pub total_bytes_received: u64,
292
293 pub hidden_services: usize,
295
296 pub connection_failures: usize,
298
299 pub avg_circuit_build_time_ms: f64,
301}
302
303pub struct TorManager {
305 config: TorConfig,
307
308 running: Arc<RwLock<bool>>,
310
311 circuits: Arc<DashMap<CircuitId, CircuitInfo>>,
313
314 next_circuit_id: Arc<RwLock<CircuitId>>,
316
317 stream_circuits: Arc<DashMap<StreamId, CircuitId>>,
319
320 next_stream_id: Arc<RwLock<StreamId>>,
322
323 hidden_services: Arc<DashMap<OnionAddress, HiddenServiceConfig>>,
325
326 stats: Arc<RwLock<TorStats>>,
328
329 circuit_build_times: Arc<RwLock<Vec<f64>>>,
331}
332
333impl TorManager {
334 pub async fn new(config: TorConfig) -> Result<Self> {
336 info!("Creating Tor manager");
337
338 if config.max_circuits == 0 {
340 return Err(TorError::ConfigError(
341 "max_circuits must be > 0".to_string(),
342 ));
343 }
344
345 Ok(Self {
346 config,
347 running: Arc::new(RwLock::new(false)),
348 circuits: Arc::new(DashMap::new()),
349 next_circuit_id: Arc::new(RwLock::new(0)),
350 stream_circuits: Arc::new(DashMap::new()),
351 next_stream_id: Arc::new(RwLock::new(0)),
352 hidden_services: Arc::new(DashMap::new()),
353 stats: Arc::new(RwLock::new(TorStats::default())),
354 circuit_build_times: Arc::new(RwLock::new(Vec::with_capacity(100))),
355 })
356 }
357
358 pub async fn start(&mut self) -> Result<()> {
360 let mut running = self.running.write();
361 if *running {
362 return Err(TorError::AlreadyRunning);
363 }
364
365 info!("Starting Tor manager");
366 info!("SOCKS proxy: {}", self.config.socks_proxy);
367 info!("Control port: {}", self.config.control_port);
368
369 *running = true;
376 info!("Tor manager started");
377
378 Ok(())
379 }
380
381 pub async fn stop(&mut self) -> Result<()> {
383 let mut running = self.running.write();
384 if !*running {
385 return Err(TorError::NotRunning);
386 }
387
388 info!("Stopping Tor manager");
389
390 for circuit in self.circuits.iter() {
392 let circuit_id = circuit.key();
393 debug!("Closing circuit {}", circuit_id);
394 }
395 self.circuits.clear();
396
397 self.hidden_services.clear();
399
400 *running = false;
401 info!("Tor manager stopped");
402
403 Ok(())
404 }
405
406 pub fn is_running(&self) -> bool {
408 *self.running.read()
409 }
410
411 pub async fn create_circuit(&self) -> Result<CircuitId> {
413 if !self.is_running() {
414 return Err(TorError::NotRunning);
415 }
416
417 if self.circuits.len() >= self.config.max_circuits {
418 self.cleanup_circuits();
420
421 if self.circuits.len() >= self.config.max_circuits {
422 return Err(TorError::CircuitFailed(
423 "Maximum circuits reached".to_string(),
424 ));
425 }
426 }
427
428 let circuit_id = {
429 let mut id = self.next_circuit_id.write();
430 let current_id = *id;
431 *id += 1;
432 current_id
433 };
434
435 let start_time = Instant::now();
436
437 let circuit = CircuitInfo {
444 id: circuit_id,
445 state: CircuitState::Ready,
446 hops: vec![
447 "GuardNode".to_string(),
448 "MiddleNode".to_string(),
449 "ExitNode".to_string(),
450 ],
451 created_at: Instant::now(),
452 stream_count: 0,
453 bytes_sent: 0,
454 bytes_received: 0,
455 };
456
457 self.circuits.insert(circuit_id, circuit);
458
459 let build_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
461 let mut build_times = self.circuit_build_times.write();
462 build_times.push(build_time_ms);
463 if build_times.len() > 100 {
464 build_times.remove(0);
465 }
466
467 let mut stats = self.stats.write();
468 stats.circuits_created += 1;
469 stats.active_circuits = self.circuits.len();
470 stats.avg_circuit_build_time_ms =
471 build_times.iter().sum::<f64>() / build_times.len() as f64;
472
473 info!("Created circuit {} in {:.1}ms", circuit_id, build_time_ms);
474
475 Ok(circuit_id)
476 }
477
478 pub async fn connect(&self, address: &str) -> Result<StreamId> {
480 if !self.is_running() {
481 return Err(TorError::NotRunning);
482 }
483
484 let circuit_id = if self.config.stream_isolation {
486 self.create_circuit().await?
488 } else {
489 self.get_or_create_circuit().await?
491 };
492
493 let stream_id = {
494 let mut id = self.next_stream_id.write();
495 let current_id = *id;
496 *id += 1;
497 current_id
498 };
499
500 self.stream_circuits.insert(stream_id, circuit_id);
508
509 if let Some(mut circuit) = self.circuits.get_mut(&circuit_id) {
511 circuit.stream_count += 1;
512 circuit.state = CircuitState::Active;
513 }
514
515 let mut stats = self.stats.write();
516 stats.streams_created += 1;
517 stats.active_streams = self.stream_circuits.len();
518
519 debug!("Connected to {} via circuit {}", address, circuit_id);
520
521 Ok(stream_id)
522 }
523
524 pub async fn create_hidden_service(&self, config: HiddenServiceConfig) -> Result<OnionAddress> {
526 if !self.is_running() {
527 return Err(TorError::NotRunning);
528 }
529
530 let onion_addr = if config.use_v3 {
538 format!(
539 "{}.onion",
540 "abcdefghijklmnopqrstuvabcdefghijklmnopqrstuvabcdefghijkl"
541 )
542 } else {
543 format!("{}.onion", "abcdefghijklmnop")
545 };
546
547 self.hidden_services.insert(onion_addr.clone(), config);
548
549 let mut stats = self.stats.write();
550 stats.hidden_services = self.hidden_services.len();
551
552 info!("Created hidden service: {}", onion_addr);
553
554 Ok(onion_addr)
555 }
556
557 pub async fn remove_hidden_service(&self, onion_addr: &str) -> Result<()> {
559 if !self.is_running() {
560 return Err(TorError::NotRunning);
561 }
562
563 if self.hidden_services.remove(onion_addr).is_some() {
564 let mut stats = self.stats.write();
565 stats.hidden_services = self.hidden_services.len();
566
567 info!("Removed hidden service: {}", onion_addr);
568 Ok(())
569 } else {
570 Err(TorError::InvalidOnionAddress(onion_addr.to_string()))
571 }
572 }
573
574 async fn get_or_create_circuit(&self) -> Result<CircuitId> {
576 let best_circuit = self
578 .circuits
579 .iter()
580 .filter(|entry| {
581 let circuit = entry.value();
582 circuit.state == CircuitState::Ready && circuit.stream_count < 10
583 })
584 .min_by_key(|entry| entry.value().stream_count)
585 .map(|entry| *entry.key());
586
587 if let Some(circuit_id) = best_circuit {
588 Ok(circuit_id)
589 } else {
590 self.create_circuit().await
591 }
592 }
593
594 fn cleanup_circuits(&self) {
596 let now = Instant::now();
597 let timeout = self.config.circuit_timeout;
598
599 let old_circuits: Vec<CircuitId> = self
600 .circuits
601 .iter()
602 .filter(|entry| {
603 let circuit = entry.value();
604 circuit.stream_count == 0 && now.duration_since(circuit.created_at) > timeout
605 })
606 .map(|entry| *entry.key())
607 .collect();
608
609 for circuit_id in old_circuits {
610 debug!("Removing old circuit {}", circuit_id);
611 self.circuits.remove(&circuit_id);
612 }
613
614 let mut stats = self.stats.write();
615 stats.active_circuits = self.circuits.len();
616 }
617
618 pub fn close_stream(&self, stream_id: StreamId) -> Result<()> {
620 if let Some((_, circuit_id)) = self.stream_circuits.remove(&stream_id) {
621 if let Some(mut circuit) = self.circuits.get_mut(&circuit_id) {
623 circuit.stream_count = circuit.stream_count.saturating_sub(1);
624
625 if circuit.stream_count == 0 {
626 circuit.state = CircuitState::Ready;
627 }
628 }
629
630 let mut stats = self.stats.write();
631 stats.active_streams = self.stream_circuits.len();
632
633 debug!("Closed stream {}", stream_id);
634 Ok(())
635 } else {
636 Err(TorError::ConnectionFailed(format!(
637 "Stream {} not found",
638 stream_id
639 )))
640 }
641 }
642
643 pub fn get_circuit(&self, circuit_id: CircuitId) -> Option<CircuitInfo> {
645 self.circuits.get(&circuit_id).map(|e| e.value().clone())
646 }
647
648 pub fn get_circuits(&self) -> Vec<CircuitInfo> {
650 self.circuits
651 .iter()
652 .map(|entry| entry.value().clone())
653 .collect()
654 }
655
656 pub fn get_hidden_services(&self) -> HashMap<OnionAddress, HiddenServiceConfig> {
658 self.hidden_services
659 .iter()
660 .map(|entry| (entry.key().clone(), entry.value().clone()))
661 .collect()
662 }
663
664 pub fn stats(&self) -> TorStats {
666 self.stats.read().clone()
667 }
668
669 pub fn config(&self) -> &TorConfig {
671 &self.config
672 }
673
674 pub fn validate_onion_address(address: &str) -> bool {
676 if !address.ends_with(".onion") {
680 return false;
681 }
682
683 let name = address.strip_suffix(".onion").unwrap();
684
685 let is_valid_base32 = |c: char| c.is_ascii_lowercase() || ('2'..='7').contains(&c);
687
688 if name.len() == 56 {
690 return name.chars().all(is_valid_base32);
691 }
692
693 if name.len() == 16 {
695 return name.chars().all(is_valid_base32);
696 }
697
698 false
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705
706 #[tokio::test]
707 async fn test_manager_creation() {
708 let config = TorConfig::default();
709 let manager = TorManager::new(config).await.unwrap();
710
711 assert!(!manager.is_running());
712 assert_eq!(manager.stats().circuits_created, 0);
713 }
714
715 #[tokio::test]
716 async fn test_start_stop() {
717 let config = TorConfig::default();
718 let mut manager = TorManager::new(config).await.unwrap();
719
720 assert!(!manager.is_running());
721
722 manager.start().await.unwrap();
723 assert!(manager.is_running());
724
725 manager.stop().await.unwrap();
726 assert!(!manager.is_running());
727 }
728
729 #[tokio::test]
730 async fn test_create_circuit() {
731 let config = TorConfig::default();
732 let mut manager = TorManager::new(config).await.unwrap();
733
734 manager.start().await.unwrap();
735
736 let circuit_id = manager.create_circuit().await.unwrap();
737 assert_eq!(circuit_id, 0);
738
739 let circuit = manager.get_circuit(circuit_id).unwrap();
740 assert_eq!(circuit.state, CircuitState::Ready);
741 assert_eq!(circuit.hops.len(), 3);
742
743 let stats = manager.stats();
744 assert_eq!(stats.circuits_created, 1);
745 assert_eq!(stats.active_circuits, 1);
746 }
747
748 #[tokio::test]
749 async fn test_max_circuits_limit() {
750 let config = TorConfig {
751 max_circuits: 2,
752 circuit_timeout: Duration::from_millis(100),
753 ..Default::default()
754 };
755 let mut manager = TorManager::new(config).await.unwrap();
756
757 manager.start().await.unwrap();
758
759 let circuit1 = manager.create_circuit().await.unwrap();
761 let _circuit2 = manager.create_circuit().await.unwrap();
762
763 if let Some(mut circuit) = manager.circuits.get_mut(&circuit1) {
765 circuit.stream_count = 0;
766 }
767
768 tokio::time::sleep(Duration::from_millis(150)).await;
770
771 let result = manager.create_circuit().await;
773 assert!(result.is_ok());
774 }
775
776 #[tokio::test]
777 async fn test_connect() {
778 let config = TorConfig::default();
779 let mut manager = TorManager::new(config).await.unwrap();
780
781 manager.start().await.unwrap();
782
783 let stream_id = manager.connect("example.onion:8080").await.unwrap();
784 assert_eq!(stream_id, 0);
785
786 let stats = manager.stats();
787 assert_eq!(stats.streams_created, 1);
788 assert_eq!(stats.active_streams, 1);
789 }
790
791 #[tokio::test]
792 async fn test_stream_isolation() {
793 let config = TorConfig {
794 stream_isolation: true,
795 ..Default::default()
796 };
797 let mut manager = TorManager::new(config).await.unwrap();
798
799 manager.start().await.unwrap();
800
801 manager.connect("example1.onion:8080").await.unwrap();
803 manager.connect("example2.onion:8080").await.unwrap();
804
805 let stats = manager.stats();
806 assert_eq!(stats.circuits_created, 2);
807 assert_eq!(stats.streams_created, 2);
808 }
809
810 #[tokio::test]
811 async fn test_hidden_service() {
812 let config = TorConfig::default();
813 let mut manager = TorManager::new(config).await.unwrap();
814
815 manager.start().await.unwrap();
816
817 let hs_config = HiddenServiceConfig::default();
818 let onion_addr = manager.create_hidden_service(hs_config).await.unwrap();
819
820 assert!(onion_addr.ends_with(".onion"));
821 assert_eq!(onion_addr.len(), 62); let stats = manager.stats();
824 assert_eq!(stats.hidden_services, 1);
825 }
826
827 #[tokio::test]
828 async fn test_remove_hidden_service() {
829 let config = TorConfig::default();
830 let mut manager = TorManager::new(config).await.unwrap();
831
832 manager.start().await.unwrap();
833
834 let hs_config = HiddenServiceConfig::default();
835 let onion_addr = manager.create_hidden_service(hs_config).await.unwrap();
836
837 manager.remove_hidden_service(&onion_addr).await.unwrap();
838
839 let stats = manager.stats();
840 assert_eq!(stats.hidden_services, 0);
841 }
842
843 #[tokio::test]
844 async fn test_close_stream() {
845 let config = TorConfig::default();
846 let mut manager = TorManager::new(config).await.unwrap();
847
848 manager.start().await.unwrap();
849
850 let stream_id = manager.connect("example.onion:8080").await.unwrap();
851 manager.close_stream(stream_id).unwrap();
852
853 let stats = manager.stats();
854 assert_eq!(stats.active_streams, 0);
855 }
856
857 #[tokio::test]
858 async fn test_config_presets() {
859 let high_privacy = TorConfig::high_privacy();
860 assert!(high_privacy.stream_isolation);
861 assert_eq!(high_privacy.max_circuits, 5);
862
863 let high_performance = TorConfig::high_performance();
864 assert!(!high_performance.stream_isolation);
865 assert_eq!(high_performance.max_circuits, 20);
866
867 let censorship = TorConfig::censorship_resistant();
868 assert!(censorship.use_bridges);
869 assert!(!censorship.bridges.is_empty());
870 }
871
872 #[test]
873 fn test_validate_onion_address() {
874 let v3_addr = "abcdefghijklmnopqrstuvabcdefghijklmnopqrstuvabcdefghijkl.onion";
876 assert!(TorManager::validate_onion_address(v3_addr));
877
878 let v2_addr = "abcdefghijklmnop.onion";
880 assert!(TorManager::validate_onion_address(v2_addr));
881
882 assert!(!TorManager::validate_onion_address("invalid"));
884 assert!(!TorManager::validate_onion_address("example.com"));
885 assert!(!TorManager::validate_onion_address("abc.onion")); assert!(!TorManager::validate_onion_address(
889 "abcdefghijklmnopqrstuvwxyz234567abcdefghijklmnopqrstuvw.onion"
890 ));
891 }
892
893 #[tokio::test]
894 async fn test_circuit_cleanup() {
895 let config = TorConfig {
896 circuit_timeout: Duration::from_millis(100),
897 ..Default::default()
898 };
899 let mut manager = TorManager::new(config).await.unwrap();
900
901 manager.start().await.unwrap();
902
903 let _circuit_id = manager.create_circuit().await.unwrap();
905
906 tokio::time::sleep(Duration::from_millis(150)).await;
908
909 manager.cleanup_circuits();
911
912 }
915
916 #[tokio::test]
917 async fn test_not_running_errors() {
918 let config = TorConfig::default();
919 let manager = TorManager::new(config).await.unwrap();
920
921 assert!(manager.create_circuit().await.is_err());
923 assert!(manager.connect("example.onion:8080").await.is_err());
924
925 let hs_config = HiddenServiceConfig::default();
926 assert!(manager.create_hidden_service(hs_config).await.is_err());
927 }
928}