onionpipe/
lib.rs

1use std::str::FromStr;
2use std::{env, fs, io, net, path, result};
3
4use regex::Regex;
5use std::os::unix::fs::PermissionsExt;
6use thiserror::Error;
7use torut::{control, onion};
8
9pub mod config;
10pub mod parse;
11pub mod secrets;
12
13#[derive(Error, Debug)]
14pub enum PipeError {
15    #[error("timeout connecting to tor control socket")]
16    ConnTimeout,
17    #[error("failed to connect to tor control socket")]
18    Conn(#[from] control::ConnError),
19    #[error("i/o error: {0}", .source)]
20    IO {
21        #[from]
22        source: io::Error,
23        //backtrace: std::backtrace::Backtrace,
24    },
25    #[error("socks error: {0}")]
26    Socks(#[from] tokio_socks::Error),
27    #[error("join error: {0}")]
28    Join(#[from] tokio::task::JoinError),
29    #[error("invalid socket address: {0}")]
30    ParseAddr(#[from] net::AddrParseError),
31    #[error("command failed: {0}")]
32    CLI(String),
33    #[error("invalid config: {0}")]
34    Config(String),
35    #[error("config parse error: {0}")]
36    ConfigParse(#[from] serde_json::Error),
37    #[error("secret store error: {0}")]
38    SecretStore(#[from] secrets::SecretsError),
39    #[error("forward parse error: {0}")]
40    ForwardParse(#[from] parse::ParseError),
41    #[error("onion address parse error: {0}")]
42    OnionAddr(#[from] torut::onion::OnionAddressParseError),
43}
44
45pub type Result<T> = result::Result<T, PipeError>;
46
47pub struct OnionPipeBuilder {
48    temp_dir: path::PathBuf,
49    exports: Vec<Export>,
50    imports: Vec<Import>,
51    secret_store: Option<secrets::SecretStore>,
52}
53
54impl OnionPipeBuilder {
55    pub fn temp_dir(mut self, temp_dir: &str) -> OnionPipeBuilder {
56        self.temp_dir = path::PathBuf::from(temp_dir);
57        self
58    }
59
60    pub fn secrets_dir(mut self, secrets_dir: &str) -> OnionPipeBuilder {
61        let secrets_dir = path::PathBuf::from(secrets_dir);
62        self.secret_store = Some(secrets::SecretStore::new(secrets_dir.to_str().unwrap()));
63        self
64    }
65
66    pub fn export(mut self, export: Export) -> OnionPipeBuilder {
67        self.exports.push(export);
68        self
69    }
70
71    pub fn import(mut self, import: Import) -> OnionPipeBuilder {
72        self.imports.push(import);
73        self
74    }
75
76    pub fn config(mut self, cfg: config::Config) -> Result<OnionPipeBuilder> {
77        if let Some(secrets_dir) = cfg.secrets_dir {
78            self = self.secrets_dir(&secrets_dir);
79        }
80        for cfg_export in cfg.exports {
81            let export = match (cfg_export, self.secret_store.as_mut()).try_into() {
82                Ok(item) => item,
83                Err(err) => return Err(err),
84            };
85            self.exports.push(export);
86        }
87        for cfg_import in cfg.imports {
88            let import = match cfg_import.try_into() {
89                Ok(item) => item,
90                Err(err) => return Err(err),
91            };
92            self.imports.push(import);
93        }
94        if let Some(temp_dir) = cfg.temp_dir {
95            self = self.temp_dir(&temp_dir)
96        }
97        Ok(self)
98    }
99
100    pub async fn new(self) -> Result<OnionPipe> {
101        let temp_dir = tempfile::tempdir_in(self.temp_dir)?;
102        let data_dir = temp_dir.path().join("data");
103        tokio::fs::create_dir(data_dir.as_path()).await?;
104        tokio::fs::set_permissions(data_dir.as_path(), fs::Permissions::from_mode(0o700)).await?;
105        let control_sock = data_dir.join("control.sock").to_str().unwrap().into();
106        let socks_sock = data_dir.join("socks.sock").to_str().unwrap().into();
107        Ok(OnionPipe {
108            temp_dir: Some(temp_dir),
109            data_dir: data_dir.to_str().unwrap().into(),
110            control_sock: control_sock,
111            socks_sock: socks_sock,
112            exports: self.exports,
113            imports: self.imports,
114        })
115    }
116}
117
118pub struct OnionPipe {
119    temp_dir: Option<tempfile::TempDir>,
120    data_dir: String,
121    control_sock: String,
122    socks_sock: String,
123    exports: Vec<Export>,
124    imports: Vec<Import>,
125}
126
127pub struct Export {
128    pub local_addr: net::SocketAddr,
129    pub remote_key: onion::TorSecretKeyV3,
130    pub remote_ports: Vec<u16>,
131}
132
133impl TryInto<Export> for (config::Export, Option<&mut secrets::SecretStore>) {
134    type Error = PipeError;
135
136    fn try_into(self) -> Result<Export> {
137        let remote_key = match (self.0.service_name, self.1) {
138            (Some(ref service_name), Some(secret_store)) => {
139                let key_bytes = secret_store.ensure_service(service_name)?;
140                torut::onion::TorSecretKeyV3::from(key_bytes)
141            }
142            (Some(_), None) => {
143                return Err(PipeError::Config("secret store not configured".to_string()))
144            }
145            (None, _) => torut::onion::TorSecretKeyV3::generate(),
146        };
147        Ok(Export {
148            local_addr: std::net::SocketAddr::from_str(self.0.local_addr.as_str())?,
149            remote_key: remote_key,
150            remote_ports: self.0.remote_ports,
151        })
152    }
153}
154
155pub struct Import {
156    pub remote_addr: onion::OnionAddress,
157    pub remote_port: u16,
158    pub local_addr: net::SocketAddr,
159}
160
161impl TryInto<Import> for config::Import {
162    type Error = PipeError;
163
164    fn try_into(self) -> Result<Import> {
165        let (remote_addr, remote_port) = parse_onion_address(&self.remote_addr)?;
166        Ok(Import {
167            remote_addr: torut::onion::OnionAddress::V3(remote_addr),
168            remote_port: remote_port,
169            local_addr: std::net::SocketAddr::from_str(self.local_addr.as_str())?,
170        })
171    }
172}
173
174fn parse_err(addr: &str) -> PipeError {
175    return PipeError::Config(format!("invalid onion address {}", addr).to_string());
176}
177
178fn parse_onion_address(addr: &str) -> Result<(torut::onion::OnionAddressV3, u16)> {
179    let re = Regex::new(r"^(?P<onion>[^\.]+)(\.onion)?(:(?P<port>\d+))$")
180        .map_err(|_| parse_err(addr))?;
181    match re.captures(addr) {
182        Some(captures) => {
183            let (remote_addr, remote_port) = match (captures.name("onion"), captures.name("port")) {
184                (Some(onion), Some(port)) => (
185                    torut::onion::OnionAddressV3::from_str(onion.as_str())?,
186                    port.as_str().parse::<u16>().map_err(|_| parse_err(addr))?,
187                ),
188                (Some(onion), None) => (
189                    torut::onion::OnionAddressV3::from_str(onion.as_str())?,
190                    80u16,
191                ),
192                _ => return Err(parse_err(addr)),
193            };
194            Ok((remote_addr, remote_port))
195        }
196        None => Err(parse_err(addr)),
197    }
198}
199
200pub enum Forward {
201    Export(Export),
202    Import(Import),
203}
204
205async fn on_event_noop(
206    _: torut::control::AsyncEvent<'static>,
207) -> result::Result<(), torut::control::ConnError> {
208    Ok(())
209}
210
211impl OnionPipe {
212    pub fn defaults() -> OnionPipeBuilder {
213        OnionPipeBuilder {
214            temp_dir: env::temp_dir(),
215            exports: vec![],
216            imports: vec![],
217            secret_store: None,
218        }
219    }
220
221    pub async fn run(&mut self) -> Result<()> {
222        self.start_tor();
223
224        wait_for_file(&self.control_sock).await?;
225        let s = tokio::net::UnixStream::connect(&self.control_sock).await?;
226        let mut utc = control::UnauthenticatedConn::new(s);
227        utc.authenticate(&control::TorAuthData::Null).await?;
228        let mut ac = utc.into_authenticated().await;
229        ac.set_async_event_handler(Some(on_event_noop));
230        ac.take_ownership().await?;
231
232        let mut active_onions = vec![];
233        for i in 0..self.exports.len() {
234            let export = &self.exports[i];
235            let remote_key = &export.remote_key;
236            println!(
237                "forward {} => {}:{}",
238                export.local_addr,
239                remote_key.public().get_onion_address(),
240                export
241                    .remote_ports
242                    .iter()
243                    .map(|port| port.to_string())
244                    .collect::<Vec<_>>()
245                    .join(","),
246            );
247            ac.add_onion_v3(
248                remote_key,
249                false,
250                false,
251                false,
252                None,
253                &mut export
254                    .remote_ports
255                    .iter()
256                    .map(|port| (port.to_owned(), export.local_addr))
257                    .collect::<Vec<_>>()
258                    .iter(),
259            )
260            .await?;
261            active_onions.push(
262                remote_key
263                    .public()
264                    .get_onion_address()
265                    .get_address_without_dot_onion(),
266            );
267        }
268
269        self.forward_imports().await?;
270
271        tokio::signal::ctrl_c().await?;
272        eprintln!("interrupt received, shutting down");
273
274        for i in 0..active_onions.len() {
275            match ac.del_onion(active_onions[i].as_str()).await {
276                Err(control::ConnError::IOError(io_err)) => {
277                    if io_err.kind() == std::io::ErrorKind::ConnectionReset {
278                        // Control connection may be lost here
279                        break;
280                    }
281                    eprintln!("failed to delete onion: {:?}", io_err);
282                }
283                Err(err) => {
284                    eprintln!("failed to delete onion: {:?}", err);
285                }
286                _ => {}
287            }
288        }
289        // TODO: poll w/timeout for a connection reset, ping w/ GETINFO
290
291        // Close connection
292        drop(ac);
293        // Delete data dir
294        tokio::fs::remove_dir_all(&self.data_dir).await?;
295        // Clean up temp dir
296        self.temp_dir.take().unwrap().close()?;
297        Ok(())
298    }
299
300    async fn forward_imports(&self) -> Result<()> {
301        for i in 0..self.imports.len() {
302            let import = &self.imports[i];
303            let import_addr = format!("{}:{}", import.remote_addr, import.remote_port);
304            let socks_addr = self.socks_sock.to_string();
305
306            tokio::spawn(run_import(
307                import.local_addr.to_string(),
308                socks_addr,
309                import_addr.to_string(),
310            ));
311
312            println!("forward {} => {}", import_addr, import.local_addr,);
313        }
314        Ok(())
315    }
316
317    fn start_tor(&self) -> () {
318        // TODO(long-term): replace with Arti when it supports onions!
319        libtor::Tor::new()
320            .flag(libtor::TorFlag::ControlSocket(
321                self.control_sock.as_str().into(),
322            ))
323            .flag(libtor::TorFlag::DataDirectory(
324                self.data_dir.as_str().into(),
325            ))
326            // TODO: configurable log level
327            .flag(libtor::TorFlag::LogTo(
328                libtor::log::LogLevel::Warn,
329                libtor::log::LogDestination::Stderr,
330            ))
331            .flag(libtor::TorFlag::Custom(
332                format!(
333                    "SocksPort unix:{} OnionTrafficOnly",
334                    self.socks_sock.as_str()
335                )
336                .into(),
337            ))
338            .start_background();
339    }
340}
341
342async fn run_import(local_addr: String, socks_addr: String, import_addr: String) -> Result<()> {
343    let local_listener = tokio::net::TcpListener::bind(local_addr).await?;
344    loop {
345        let (local_stream, _) = local_listener.accept().await?;
346        println!("got connection");
347        let proxy_stream = match tokio::net::UnixStream::connect(socks_addr.as_str()).await {
348            Ok(s) => s,
349            Err(e) => {
350                eprintln!("socks proxy connection failed: {}", e);
351                continue;
352            }
353        };
354        let remote_stream = match tokio_socks::tcp::Socks5Stream::connect_with_socket(
355            proxy_stream,
356            import_addr.as_str(),
357        )
358        .await
359        {
360            Ok(s) => s,
361            Err(e) => {
362                eprintln!("remote onion connection failed: {}", e);
363                continue;
364            }
365        };
366        tokio::spawn(forward_stream(local_stream, remote_stream));
367    }
368}
369
370async fn forward_stream(
371    mut local: tokio::net::TcpStream,
372    mut remote: tokio_socks::tcp::Socks5Stream<tokio::net::UnixStream>,
373) -> Result<()> {
374    let (mut local_read, mut local_write) = local.split();
375    let (mut remote_read, mut remote_write) = remote.split();
376    tokio::select! {
377        _ = async {
378            tokio::io::copy(&mut remote_read, &mut local_write).await?;
379            Ok::<_, PipeError>(())
380        } => {}
381        _ = async {
382            tokio::io::copy(&mut local_read, &mut remote_write).await?;
383            Ok::<_, PipeError>(())
384        } => {}
385        else => {}
386    };
387    Ok(())
388}
389
390async fn wait_for_file(path: &str) -> Result<()> {
391    for i in 0..10 {
392        match tokio::fs::metadata(path).await {
393            Ok(_) => {
394                return Ok(());
395            }
396            Err(_) => {}
397        }
398        tokio::time::sleep(tokio::time::Duration::from_secs(i)).await;
399    }
400    Err(PipeError::ConnTimeout)
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn try_into_export() {
409        let export_config = config::Export {
410            local_addr: "127.0.0.1:4566".to_string(),
411            service_name: Some("some_service".to_string()),
412            remote_ports: vec![4567],
413        };
414        let tmp_dir = tempfile::tempdir().unwrap();
415        let secrets_dir = tmp_dir.path().join("secrets");
416        let mut store = secrets::SecretStore::new(secrets_dir.to_str().unwrap());
417
418        let export: Export = (export_config, Some(&mut store)).try_into().unwrap();
419        assert_eq!("127.0.0.1:4566".parse(), Ok(export.local_addr));
420        assert_eq!(
421            export
422                .remote_key
423                .public()
424                .get_onion_address()
425                .get_address_without_dot_onion()
426                .as_str()
427                .len(),
428            "wdz54gdzddxqigr27g5ivc4q3ekfrpmhe45yyb75kzhrkl577yalq7qd".len()
429        );
430        assert_eq!(export.remote_ports, vec![4567]);
431        assert_eq!(store.list_services().unwrap(), vec!["some_service"]);
432
433        // Test that secret store is consistent
434        let export2_config = config::Export {
435            local_addr: "127.0.0.1:4566".to_string(),
436            service_name: Some("some_service".to_string()),
437            remote_ports: vec![4567],
438        };
439        let export2: Export = (export2_config, Some(&mut store)).try_into().unwrap();
440        assert_eq!(export.remote_key, export2.remote_key);
441        assert_eq!(store.list_services().unwrap(), vec!["some_service"]);
442    }
443
444    #[test]
445    fn try_into_export_new_onion() {
446        let export_config = config::Export {
447            local_addr: "127.0.0.1:4566".to_string(),
448            service_name: None,
449            remote_ports: vec![4567],
450        };
451        let tmp_dir = tempfile::tempdir().unwrap();
452        let secrets_dir = tmp_dir.path().join("secrets");
453        let mut store = secrets::SecretStore::new(secrets_dir.to_str().unwrap());
454
455        let export: Export = (export_config, Some(&mut store)).try_into().unwrap();
456        assert_eq!("127.0.0.1:4566".parse(), Ok(export.local_addr));
457        assert_eq!(
458            export
459                .remote_key
460                .public()
461                .get_onion_address()
462                .get_address_without_dot_onion()
463                .as_str()
464                .len(),
465            56
466        );
467        assert_eq!(export.remote_ports, vec![4567]);
468    }
469
470    #[test]
471    fn try_into_export_unix() {
472        let export_config = config::Export {
473            local_addr: "unix:/tmp/foo.sock".to_string(),
474            service_name: None,
475            remote_ports: vec![4567],
476        };
477        let tmp_dir = tempfile::tempdir().unwrap();
478        let secrets_dir = tmp_dir.path().join("secrets");
479        let mut store = secrets::SecretStore::new(secrets_dir.to_str().unwrap());
480
481        let result: Result<Export> = (export_config, Some(&mut store)).try_into();
482        // TODO: Improve torut to support local unix sockets.
483        assert!(matches!(result, Err(PipeError::ParseAddr(_))));
484    }
485}