1use std::str::FromStr;
2use std::{env, fs, io, net, path, result};
3
4use regex::Regex;
5use std::os::unix::fs::PermissionsExt;
6use thiserror::Error;
7use torut::{control, onion};
8
9pub mod config;
10pub mod parse;
11pub mod secrets;
12
13#[derive(Error, Debug)]
14pub enum PipeError {
15 #[error("timeout connecting to tor control socket")]
16 ConnTimeout,
17 #[error("failed to connect to tor control socket")]
18 Conn(#[from] control::ConnError),
19 #[error("i/o error: {0}", .source)]
20 IO {
21 #[from]
22 source: io::Error,
23 },
25 #[error("socks error: {0}")]
26 Socks(#[from] tokio_socks::Error),
27 #[error("join error: {0}")]
28 Join(#[from] tokio::task::JoinError),
29 #[error("invalid socket address: {0}")]
30 ParseAddr(#[from] net::AddrParseError),
31 #[error("command failed: {0}")]
32 CLI(String),
33 #[error("invalid config: {0}")]
34 Config(String),
35 #[error("config parse error: {0}")]
36 ConfigParse(#[from] serde_json::Error),
37 #[error("secret store error: {0}")]
38 SecretStore(#[from] secrets::SecretsError),
39 #[error("forward parse error: {0}")]
40 ForwardParse(#[from] parse::ParseError),
41 #[error("onion address parse error: {0}")]
42 OnionAddr(#[from] torut::onion::OnionAddressParseError),
43}
44
45pub type Result<T> = result::Result<T, PipeError>;
46
47pub struct OnionPipeBuilder {
48 temp_dir: path::PathBuf,
49 exports: Vec<Export>,
50 imports: Vec<Import>,
51 secret_store: Option<secrets::SecretStore>,
52}
53
54impl OnionPipeBuilder {
55 pub fn temp_dir(mut self, temp_dir: &str) -> OnionPipeBuilder {
56 self.temp_dir = path::PathBuf::from(temp_dir);
57 self
58 }
59
60 pub fn secrets_dir(mut self, secrets_dir: &str) -> OnionPipeBuilder {
61 let secrets_dir = path::PathBuf::from(secrets_dir);
62 self.secret_store = Some(secrets::SecretStore::new(secrets_dir.to_str().unwrap()));
63 self
64 }
65
66 pub fn export(mut self, export: Export) -> OnionPipeBuilder {
67 self.exports.push(export);
68 self
69 }
70
71 pub fn import(mut self, import: Import) -> OnionPipeBuilder {
72 self.imports.push(import);
73 self
74 }
75
76 pub fn config(mut self, cfg: config::Config) -> Result<OnionPipeBuilder> {
77 if let Some(secrets_dir) = cfg.secrets_dir {
78 self = self.secrets_dir(&secrets_dir);
79 }
80 for cfg_export in cfg.exports {
81 let export = match (cfg_export, self.secret_store.as_mut()).try_into() {
82 Ok(item) => item,
83 Err(err) => return Err(err),
84 };
85 self.exports.push(export);
86 }
87 for cfg_import in cfg.imports {
88 let import = match cfg_import.try_into() {
89 Ok(item) => item,
90 Err(err) => return Err(err),
91 };
92 self.imports.push(import);
93 }
94 if let Some(temp_dir) = cfg.temp_dir {
95 self = self.temp_dir(&temp_dir)
96 }
97 Ok(self)
98 }
99
100 pub async fn new(self) -> Result<OnionPipe> {
101 let temp_dir = tempfile::tempdir_in(self.temp_dir)?;
102 let data_dir = temp_dir.path().join("data");
103 tokio::fs::create_dir(data_dir.as_path()).await?;
104 tokio::fs::set_permissions(data_dir.as_path(), fs::Permissions::from_mode(0o700)).await?;
105 let control_sock = data_dir.join("control.sock").to_str().unwrap().into();
106 let socks_sock = data_dir.join("socks.sock").to_str().unwrap().into();
107 Ok(OnionPipe {
108 temp_dir: Some(temp_dir),
109 data_dir: data_dir.to_str().unwrap().into(),
110 control_sock: control_sock,
111 socks_sock: socks_sock,
112 exports: self.exports,
113 imports: self.imports,
114 })
115 }
116}
117
118pub struct OnionPipe {
119 temp_dir: Option<tempfile::TempDir>,
120 data_dir: String,
121 control_sock: String,
122 socks_sock: String,
123 exports: Vec<Export>,
124 imports: Vec<Import>,
125}
126
127pub struct Export {
128 pub local_addr: net::SocketAddr,
129 pub remote_key: onion::TorSecretKeyV3,
130 pub remote_ports: Vec<u16>,
131}
132
133impl TryInto<Export> for (config::Export, Option<&mut secrets::SecretStore>) {
134 type Error = PipeError;
135
136 fn try_into(self) -> Result<Export> {
137 let remote_key = match (self.0.service_name, self.1) {
138 (Some(ref service_name), Some(secret_store)) => {
139 let key_bytes = secret_store.ensure_service(service_name)?;
140 torut::onion::TorSecretKeyV3::from(key_bytes)
141 }
142 (Some(_), None) => {
143 return Err(PipeError::Config("secret store not configured".to_string()))
144 }
145 (None, _) => torut::onion::TorSecretKeyV3::generate(),
146 };
147 Ok(Export {
148 local_addr: std::net::SocketAddr::from_str(self.0.local_addr.as_str())?,
149 remote_key: remote_key,
150 remote_ports: self.0.remote_ports,
151 })
152 }
153}
154
155pub struct Import {
156 pub remote_addr: onion::OnionAddress,
157 pub remote_port: u16,
158 pub local_addr: net::SocketAddr,
159}
160
161impl TryInto<Import> for config::Import {
162 type Error = PipeError;
163
164 fn try_into(self) -> Result<Import> {
165 let (remote_addr, remote_port) = parse_onion_address(&self.remote_addr)?;
166 Ok(Import {
167 remote_addr: torut::onion::OnionAddress::V3(remote_addr),
168 remote_port: remote_port,
169 local_addr: std::net::SocketAddr::from_str(self.local_addr.as_str())?,
170 })
171 }
172}
173
174fn parse_err(addr: &str) -> PipeError {
175 return PipeError::Config(format!("invalid onion address {}", addr).to_string());
176}
177
178fn parse_onion_address(addr: &str) -> Result<(torut::onion::OnionAddressV3, u16)> {
179 let re = Regex::new(r"^(?P<onion>[^\.]+)(\.onion)?(:(?P<port>\d+))$")
180 .map_err(|_| parse_err(addr))?;
181 match re.captures(addr) {
182 Some(captures) => {
183 let (remote_addr, remote_port) = match (captures.name("onion"), captures.name("port")) {
184 (Some(onion), Some(port)) => (
185 torut::onion::OnionAddressV3::from_str(onion.as_str())?,
186 port.as_str().parse::<u16>().map_err(|_| parse_err(addr))?,
187 ),
188 (Some(onion), None) => (
189 torut::onion::OnionAddressV3::from_str(onion.as_str())?,
190 80u16,
191 ),
192 _ => return Err(parse_err(addr)),
193 };
194 Ok((remote_addr, remote_port))
195 }
196 None => Err(parse_err(addr)),
197 }
198}
199
200pub enum Forward {
201 Export(Export),
202 Import(Import),
203}
204
205async fn on_event_noop(
206 _: torut::control::AsyncEvent<'static>,
207) -> result::Result<(), torut::control::ConnError> {
208 Ok(())
209}
210
211impl OnionPipe {
212 pub fn defaults() -> OnionPipeBuilder {
213 OnionPipeBuilder {
214 temp_dir: env::temp_dir(),
215 exports: vec![],
216 imports: vec![],
217 secret_store: None,
218 }
219 }
220
221 pub async fn run(&mut self) -> Result<()> {
222 self.start_tor();
223
224 wait_for_file(&self.control_sock).await?;
225 let s = tokio::net::UnixStream::connect(&self.control_sock).await?;
226 let mut utc = control::UnauthenticatedConn::new(s);
227 utc.authenticate(&control::TorAuthData::Null).await?;
228 let mut ac = utc.into_authenticated().await;
229 ac.set_async_event_handler(Some(on_event_noop));
230 ac.take_ownership().await?;
231
232 let mut active_onions = vec![];
233 for i in 0..self.exports.len() {
234 let export = &self.exports[i];
235 let remote_key = &export.remote_key;
236 println!(
237 "forward {} => {}:{}",
238 export.local_addr,
239 remote_key.public().get_onion_address(),
240 export
241 .remote_ports
242 .iter()
243 .map(|port| port.to_string())
244 .collect::<Vec<_>>()
245 .join(","),
246 );
247 ac.add_onion_v3(
248 remote_key,
249 false,
250 false,
251 false,
252 None,
253 &mut export
254 .remote_ports
255 .iter()
256 .map(|port| (port.to_owned(), export.local_addr))
257 .collect::<Vec<_>>()
258 .iter(),
259 )
260 .await?;
261 active_onions.push(
262 remote_key
263 .public()
264 .get_onion_address()
265 .get_address_without_dot_onion(),
266 );
267 }
268
269 self.forward_imports().await?;
270
271 tokio::signal::ctrl_c().await?;
272 eprintln!("interrupt received, shutting down");
273
274 for i in 0..active_onions.len() {
275 match ac.del_onion(active_onions[i].as_str()).await {
276 Err(control::ConnError::IOError(io_err)) => {
277 if io_err.kind() == std::io::ErrorKind::ConnectionReset {
278 break;
280 }
281 eprintln!("failed to delete onion: {:?}", io_err);
282 }
283 Err(err) => {
284 eprintln!("failed to delete onion: {:?}", err);
285 }
286 _ => {}
287 }
288 }
289 drop(ac);
293 tokio::fs::remove_dir_all(&self.data_dir).await?;
295 self.temp_dir.take().unwrap().close()?;
297 Ok(())
298 }
299
300 async fn forward_imports(&self) -> Result<()> {
301 for i in 0..self.imports.len() {
302 let import = &self.imports[i];
303 let import_addr = format!("{}:{}", import.remote_addr, import.remote_port);
304 let socks_addr = self.socks_sock.to_string();
305
306 tokio::spawn(run_import(
307 import.local_addr.to_string(),
308 socks_addr,
309 import_addr.to_string(),
310 ));
311
312 println!("forward {} => {}", import_addr, import.local_addr,);
313 }
314 Ok(())
315 }
316
317 fn start_tor(&self) -> () {
318 libtor::Tor::new()
320 .flag(libtor::TorFlag::ControlSocket(
321 self.control_sock.as_str().into(),
322 ))
323 .flag(libtor::TorFlag::DataDirectory(
324 self.data_dir.as_str().into(),
325 ))
326 .flag(libtor::TorFlag::LogTo(
328 libtor::log::LogLevel::Warn,
329 libtor::log::LogDestination::Stderr,
330 ))
331 .flag(libtor::TorFlag::Custom(
332 format!(
333 "SocksPort unix:{} OnionTrafficOnly",
334 self.socks_sock.as_str()
335 )
336 .into(),
337 ))
338 .start_background();
339 }
340}
341
342async fn run_import(local_addr: String, socks_addr: String, import_addr: String) -> Result<()> {
343 let local_listener = tokio::net::TcpListener::bind(local_addr).await?;
344 loop {
345 let (local_stream, _) = local_listener.accept().await?;
346 println!("got connection");
347 let proxy_stream = match tokio::net::UnixStream::connect(socks_addr.as_str()).await {
348 Ok(s) => s,
349 Err(e) => {
350 eprintln!("socks proxy connection failed: {}", e);
351 continue;
352 }
353 };
354 let remote_stream = match tokio_socks::tcp::Socks5Stream::connect_with_socket(
355 proxy_stream,
356 import_addr.as_str(),
357 )
358 .await
359 {
360 Ok(s) => s,
361 Err(e) => {
362 eprintln!("remote onion connection failed: {}", e);
363 continue;
364 }
365 };
366 tokio::spawn(forward_stream(local_stream, remote_stream));
367 }
368}
369
370async fn forward_stream(
371 mut local: tokio::net::TcpStream,
372 mut remote: tokio_socks::tcp::Socks5Stream<tokio::net::UnixStream>,
373) -> Result<()> {
374 let (mut local_read, mut local_write) = local.split();
375 let (mut remote_read, mut remote_write) = remote.split();
376 tokio::select! {
377 _ = async {
378 tokio::io::copy(&mut remote_read, &mut local_write).await?;
379 Ok::<_, PipeError>(())
380 } => {}
381 _ = async {
382 tokio::io::copy(&mut local_read, &mut remote_write).await?;
383 Ok::<_, PipeError>(())
384 } => {}
385 else => {}
386 };
387 Ok(())
388}
389
390async fn wait_for_file(path: &str) -> Result<()> {
391 for i in 0..10 {
392 match tokio::fs::metadata(path).await {
393 Ok(_) => {
394 return Ok(());
395 }
396 Err(_) => {}
397 }
398 tokio::time::sleep(tokio::time::Duration::from_secs(i)).await;
399 }
400 Err(PipeError::ConnTimeout)
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn try_into_export() {
409 let export_config = config::Export {
410 local_addr: "127.0.0.1:4566".to_string(),
411 service_name: Some("some_service".to_string()),
412 remote_ports: vec![4567],
413 };
414 let tmp_dir = tempfile::tempdir().unwrap();
415 let secrets_dir = tmp_dir.path().join("secrets");
416 let mut store = secrets::SecretStore::new(secrets_dir.to_str().unwrap());
417
418 let export: Export = (export_config, Some(&mut store)).try_into().unwrap();
419 assert_eq!("127.0.0.1:4566".parse(), Ok(export.local_addr));
420 assert_eq!(
421 export
422 .remote_key
423 .public()
424 .get_onion_address()
425 .get_address_without_dot_onion()
426 .as_str()
427 .len(),
428 "wdz54gdzddxqigr27g5ivc4q3ekfrpmhe45yyb75kzhrkl577yalq7qd".len()
429 );
430 assert_eq!(export.remote_ports, vec![4567]);
431 assert_eq!(store.list_services().unwrap(), vec!["some_service"]);
432
433 let export2_config = config::Export {
435 local_addr: "127.0.0.1:4566".to_string(),
436 service_name: Some("some_service".to_string()),
437 remote_ports: vec![4567],
438 };
439 let export2: Export = (export2_config, Some(&mut store)).try_into().unwrap();
440 assert_eq!(export.remote_key, export2.remote_key);
441 assert_eq!(store.list_services().unwrap(), vec!["some_service"]);
442 }
443
444 #[test]
445 fn try_into_export_new_onion() {
446 let export_config = config::Export {
447 local_addr: "127.0.0.1:4566".to_string(),
448 service_name: None,
449 remote_ports: vec![4567],
450 };
451 let tmp_dir = tempfile::tempdir().unwrap();
452 let secrets_dir = tmp_dir.path().join("secrets");
453 let mut store = secrets::SecretStore::new(secrets_dir.to_str().unwrap());
454
455 let export: Export = (export_config, Some(&mut store)).try_into().unwrap();
456 assert_eq!("127.0.0.1:4566".parse(), Ok(export.local_addr));
457 assert_eq!(
458 export
459 .remote_key
460 .public()
461 .get_onion_address()
462 .get_address_without_dot_onion()
463 .as_str()
464 .len(),
465 56
466 );
467 assert_eq!(export.remote_ports, vec![4567]);
468 }
469
470 #[test]
471 fn try_into_export_unix() {
472 let export_config = config::Export {
473 local_addr: "unix:/tmp/foo.sock".to_string(),
474 service_name: None,
475 remote_ports: vec![4567],
476 };
477 let tmp_dir = tempfile::tempdir().unwrap();
478 let secrets_dir = tmp_dir.path().join("secrets");
479 let mut store = secrets::SecretStore::new(secrets_dir.to_str().unwrap());
480
481 let result: Result<Export> = (export_config, Some(&mut store)).try_into();
482 assert!(matches!(result, Err(PipeError::ParseAddr(_))));
484 }
485}