tor_interface/
arti_client_tor_client.rs

1// standard
2use std::net::SocketAddr;
3use std::ops::DerefMut;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9//extern
10use arti_client::config::{CfgPath, TorClientConfigBuilder};
11use arti_client::{BootstrapBehavior, DangerouslyIntoTorAddr, IntoTorAddr, TorClient};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::runtime;
15use tokio_stream::StreamExt;
16use tor_cell::relaycell::msg::Connected;
17use tor_config::ExplicitOrAuto;
18use tor_hsservice::config::restricted_discovery::HsClientNickname;
19use tor_hsservice::config::OnionServiceConfigBuilder;
20use tor_hsservice::{HsNickname, RunningOnionService};
21use tor_keymgr::{config::ArtiKeystoreKind, KeystoreSelector};
22use tor_llcrypto::pk::ed25519::ExpandedKeypair;
23use tor_proto::stream::IncomingStreamRequest;
24use tor_rtcompat::PreferredRuntime;
25
26// internal crates
27use crate::tor_crypto::*;
28use crate::tor_provider;
29use crate::tor_provider::*;
30
31/// [`ArtiClientTorClient`]-specific error type
32#[derive(thiserror::Error, Debug)]
33pub enum Error {
34    #[error("not implemented")]
35    NotImplemented(),
36
37    #[error("unable to bind TCP listener")]
38    TcpListenerBindFailed(#[source] std::io::Error),
39
40    #[error("unable to get TCP listener's local address")]
41    TcpListenerLocalAddrFailed(#[source] std::io::Error),
42
43    #[error("unable to accept connection on TCP Listener")]
44    TcpListenerAcceptFailed(#[source] std::io::Error),
45
46    #[error("unable to connect to TCP listener")]
47    TcpStreamConnectFailed(#[source] std::io::Error),
48
49    #[error("unable to convert tokio::TcpStream to std::net::TcpStream")]
50    TcpStreamIntoFailed(#[source] std::io::Error),
51
52    #[error("arti-client config-builder error: {0}")]
53    ArtiClientConfigBuilderError(#[source] arti_client::config::ConfigBuildError),
54
55    #[error("arti-client error: {0}")]
56    ArtiClientError(#[source] arti_client::Error),
57
58    #[error("arti-client tor-addr error: {0}")]
59    ArtiClientTorAddrError(#[source] arti_client::TorAddrError),
60
61    #[error("arti-client onion-service startup error: {0}")]
62    ArtiClientOnionServiceLaunchError(#[source] arti_client::Error),
63
64    #[error("tor-keymgr error: {0}")]
65    TorKeyMgrError(#[source] tor_keymgr::Error),
66
67    #[error("onion-service config-builder error: {0}")]
68    OnionServiceConfigBuilderError(#[source] tor_config::ConfigBuildError),
69}
70
71impl From<Error> for crate::tor_provider::Error {
72    fn from(error: Error) -> Self {
73        crate::tor_provider::Error::Generic(error.to_string())
74    }
75}
76
77/// The `ArtiClientTorClient` is an in-process [`arti-client`](https://crates.io/crates/arti-client)-based [`TorProvider`].
78///
79///
80pub struct ArtiClientTorClient {
81    tokio_runtime: Arc<runtime::Runtime>,
82    arti_client: TorClient<PreferredRuntime>,
83    pending_events: Arc<Mutex<Vec<TorEvent>>>,
84    bootstrapped: Arc<AtomicBool>,
85    next_connect_handle: ConnectHandle,
86}
87
88// used to forward traffic to/from arti to local tcp streams
89async fn forward_stream<R, W>(alive: Arc<AtomicBool>, mut reader: R, mut writer: W)
90where
91    R: AsyncReadExt + Unpin,
92    W: AsyncWriteExt + Unpin,
93{
94    // allow 100ms timeout on reads to verify writer is still good
95    let read_timeout = std::time::Duration::from_millis(100);
96    // allow additional retries in the event the other half of the pump
97    // dies; keep pumping data until our read times out 3 times
98    let mut remaining_retries = 3;
99    let mut buf = [0u8; 1024];
100
101    loop {
102        if !alive.load(Ordering::Relaxed) && remaining_retries == 0 {
103            break;
104        }
105
106        tokio::select! {
107            count = reader.read(&mut buf) => match count {
108                // end of stream
109                Ok(0) => break,
110                // read N bytes
111                Ok(count) => {
112                    // forward traffic
113                    match writer.write_all(&buf[0..count]).await {
114                        Ok(()) => (),
115                        Err(_err) => break,
116                    }
117                    match writer.flush().await {
118                        Ok(()) => (),
119                        Err(_err) => break,
120                    }
121                },
122                // read failed
123                Err(_err) => break,
124            },
125            _ = tokio::time::sleep(read_timeout) => match writer.flush().await {
126                Ok(()) => {
127                    // so long as our writer and reader are good, we should
128                    // allow a few additional data pump attempts
129                    if !alive.load(Ordering::Relaxed) {
130                        remaining_retries -= 1;
131                    }
132                },
133                Err(_err) => break,
134            }
135        }
136    }
137    // signal pump death
138    alive.store(false, Ordering::Relaxed);
139}
140
141impl ArtiClientTorClient {
142    /// Construct a new `ArtiClientTorClient` which uses a [Tokio](https://crates.io/crates/tokio) runtime internally for all async operations.
143    pub fn new(
144        tokio_runtime: Arc<runtime::Runtime>,
145        root_data_directory: &Path,
146    ) -> Result<Self, Error> {
147        // set custom config options
148        let mut config_builder: TorClientConfigBuilder = Default::default();
149
150        // manually set arti cache and data directories so we can have
151        // multiple concurrent instances and control where it writes
152        let mut cache_dir = PathBuf::from(root_data_directory);
153        cache_dir.push("cache");
154        config_builder
155            .storage()
156            .cache_dir(CfgPath::new_literal(cache_dir))
157            .keystore()
158            .primary()
159            .kind(ExplicitOrAuto::Explicit(ArtiKeystoreKind::Ephemeral));
160
161        let mut state_dir = PathBuf::from(root_data_directory);
162        state_dir.push("state");
163        config_builder
164            .storage()
165            .state_dir(CfgPath::new_literal(state_dir));
166
167        // disable access to clearnet addresses and enable access to onion services
168        config_builder
169            .address_filter()
170            .allow_local_addrs(false)
171            .allow_onion_addrs(true);
172
173        let config = match config_builder.build() {
174            Ok(config) => config,
175            Err(err) => return Err(Error::ArtiClientConfigBuilderError(err)),
176        };
177
178        let arti_client = tokio_runtime.block_on(async {
179            TorClient::builder()
180                .config(config)
181                .bootstrap_behavior(BootstrapBehavior::Manual)
182                .create_unbootstrapped()
183                .map_err(Error::ArtiClientError)
184
185            // TODO: implement TorEvent::LogReceived events once upstream issue is resolved:
186            // https://gitlab.torproject.org/tpo/core/arti/-/issues/1356
187        })?;
188
189        let pending_events = std::vec![TorEvent::LogReceived {
190            line: "Starting arti-client TorProvider".to_string()
191        }];
192        let pending_events = Arc::new(Mutex::new(pending_events));
193
194        Ok(Self {
195            tokio_runtime,
196            arti_client,
197            pending_events,
198            bootstrapped: Arc::new(AtomicBool::new(false)),
199            next_connect_handle: Default::default(),
200        })
201    }
202
203    async fn connect_impl(
204        target_addr: TargetAddr,
205        arti_client: TorClient<PreferredRuntime>,
206    ) -> Result<std::net::TcpStream, tor_provider::Error> {
207        // convert TargetAddr to TorAddr
208        let arti_target = match target_addr.clone() {
209            TargetAddr::Socket(socket_addr) => socket_addr.into_tor_addr_dangerously(),
210            TargetAddr::Domain(domain_addr) => {
211                (domain_addr.domain(), domain_addr.port()).into_tor_addr()
212            }
213            TargetAddr::OnionService(OnionAddr::V3(OnionAddrV3 {
214                service_id,
215                virt_port,
216            })) => (format!("{}.onion", service_id), virt_port).into_tor_addr(),
217        }
218        .map_err(Error::ArtiClientTorAddrError)?;
219
220        // connect to target
221        let data_stream = arti_client
222            .connect(arti_target)
223            .await
224            .map_err(Error::ArtiClientError)?;
225
226        // start a task to forward traffic from returned data stream
227        // and tcp socket
228        let (data_reader, data_writer) = data_stream.split();
229
230        // try to bind to a local address, let OS pick our port
231        let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
232        let server_listener = TcpListener::bind(socket_addr)
233            .await
234            .map_err(Error::TcpListenerBindFailed)?;
235        // await future after a client connects
236        let server_accept_future = server_listener.accept();
237        let socket_addr = server_listener
238            .local_addr()
239            .map_err(Error::TcpListenerLocalAddrFailed)?;
240
241        // client stream will ultimatley be returned from connect_impl()
242        let client_stream = TcpStream::connect(socket_addr)
243            .await
244            .map_err(Error::TcpStreamConnectFailed)?;
245        // client has connected so now get the server's tcp stream
246        let (server_stream, _socket_addr) = server_accept_future
247            .await
248            .map_err(Error::TcpListenerAcceptFailed)?;
249        let (tcp_reader, tcp_writer) = server_stream.into_split();
250
251        // now spawn new tasks to forward traffic to/from local listener
252        let pump_alive = Arc::new(AtomicBool::new(true));
253        tokio::task::spawn({
254            let pump_alive = pump_alive.clone();
255            async move {
256                forward_stream(pump_alive, tcp_reader, data_writer).await;
257            }
258        });
259        tokio::task::spawn(async move {
260            forward_stream(pump_alive, data_reader, tcp_writer).await;
261        });
262        let client_stream = client_stream
263            .into_std()
264            .map_err(Error::TcpStreamIntoFailed)?;
265        Ok::<std::net::TcpStream, tor_provider::Error>(client_stream)
266    }
267}
268
269impl TorProvider for ArtiClientTorClient {
270    fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
271        std::thread::sleep(std::time::Duration::from_millis(16));
272        match self.pending_events.lock() {
273            Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())),
274            Err(_) => {
275                unreachable!("another thread panicked while holding this pending_events mutex")
276            }
277        }
278    }
279
280    fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
281        // save progress events
282        let mut bootstrap_events = self.arti_client.bootstrap_events();
283        let pending_events = self.pending_events.clone();
284        let bootstrapped = self.bootstrapped.clone();
285        self.tokio_runtime.spawn(async move {
286            while let Some(evt) = bootstrap_events.next().await {
287                if bootstrapped.load(Ordering::Relaxed) {
288                    break;
289                }
290                match pending_events.lock() {
291                    Ok(mut pending_events) => {
292                        pending_events.push(TorEvent::BootstrapStatus {
293                            progress: (evt.as_frac().clamp(0.0f32, 1.0f32) * 100f32) as u32,
294                            tag: "no-tag".to_string(),
295                            summary: "no summary".to_string(),
296                        });
297                        // TODO: properly handle evt.blocked() with a new TorEvent::Error or something
298                    }
299                    Err(_) => unreachable!(
300                        "another thread panicked while holding this pending_events mutex"
301                    ),
302                }
303            }
304        });
305
306        // initiate bootstrap
307        let arti_client = self.arti_client.clone();
308        let pending_events = self.pending_events.clone();
309        let bootstrapped = self.bootstrapped.clone();
310        self.tokio_runtime.spawn(async move {
311            match arti_client.bootstrap().await {
312                Ok(()) => match pending_events.lock() {
313                    Ok(mut pending_events) => {
314                        pending_events.push(TorEvent::BootstrapStatus {
315                            progress: 100,
316                            tag: "no-tag".to_string(),
317                            summary: "no summary".to_string(),
318                        });
319                        pending_events.push(TorEvent::BootstrapComplete);
320                        bootstrapped.store(true, Ordering::Relaxed);
321                    }
322                    Err(_) => unreachable!(
323                        "another thread panicked while holding this pending_events mutex"
324                    ),
325                },
326                Err(_err) => {
327                    // TODO: add an error event to TorEvent
328                }
329            }
330        });
331
332        Ok(())
333    }
334
335    fn add_client_auth(
336        &mut self,
337        service_id: &V3OnionServiceId,
338        client_auth: &X25519PrivateKey,
339    ) -> Result<(), tor_provider::Error> {
340        let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
341        let hs_id = *ed25519_public.as_bytes();
342
343        self.arti_client
344            .insert_service_discovery_key(
345                KeystoreSelector::Primary,
346                hs_id.into(),
347                client_auth.inner().clone().into(),
348            )
349            .map_err(Error::ArtiClientError)?;
350
351        Ok(())
352    }
353
354    fn remove_client_auth(
355        &mut self,
356        service_id: &V3OnionServiceId,
357    ) -> Result<(), tor_provider::Error> {
358        let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
359        let hs_id = *ed25519_public.as_bytes();
360
361        self.arti_client
362            .remove_service_discovery_key(KeystoreSelector::Primary, hs_id.into())
363            .map_err(Error::ArtiClientError)?;
364
365        Ok(())
366    }
367
368    fn connect(
369        &mut self,
370        target: TargetAddr,
371        circuit: Option<CircuitToken>,
372    ) -> Result<OnionStream, tor_provider::Error> {
373        // stream isolation not implemented yet
374        if circuit.is_some() {
375            return Err(Error::NotImplemented().into());
376        }
377
378        let arti_client = self.arti_client.clone();
379        let stream = self.tokio_runtime.block_on({
380            let target = target.clone();
381            async move { Self::connect_impl(target, arti_client).await }
382        })?;
383        Ok(OnionStream {
384            stream,
385            local_addr: None,
386            peer_addr: Some(target),
387        })
388    }
389
390    fn connect_async(
391        &mut self,
392        target: TargetAddr,
393        circuit: Option<CircuitToken>,
394    ) -> Result<ConnectHandle, tor_provider::Error> {
395        // stream isolation not implemented yet
396        if circuit.is_some() {
397            return Err(Error::NotImplemented().into());
398        }
399
400        let handle = self.next_connect_handle;
401        self.next_connect_handle += 1usize;
402
403        let arti_client = self.arti_client.clone();
404        let pending_events = Arc::downgrade(&self.pending_events);
405
406        self.tokio_runtime.spawn(async move {
407            let stream = Self::connect_impl(target.clone(), arti_client).await;
408            if let Some(pending_events) = pending_events.upgrade() {
409                let event = match stream {
410                    Ok(stream) => {
411                        let stream = OnionStream {
412                            stream,
413                            local_addr: None,
414                            peer_addr: Some(target),
415                        };
416                        TorEvent::ConnectComplete { handle, stream }
417                    }
418                    Err(error) => TorEvent::ConnectFailed { handle, error },
419                };
420                let mut pending_events = pending_events
421                    .lock()
422                    .expect("pending_events mutex poisoned");
423                pending_events.push(event);
424            }
425        });
426
427        Ok(handle)
428    }
429
430    fn listener(
431        &mut self,
432        private_key: &Ed25519PrivateKey,
433        virt_port: u16,
434        authorized_clients: Option<&[X25519PublicKey]>,
435    ) -> Result<OnionListener, tor_provider::Error> {
436        // try to bind to a local address, let OS pick our port
437        let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
438        // TODO: make this one async too
439        let listener =
440            std::net::TcpListener::bind(socket_addr).map_err(Error::TcpListenerBindFailed)?;
441        let socket_addr = listener
442            .local_addr()
443            .map_err(Error::TcpListenerLocalAddrFailed)?;
444
445        // generate a nickname to identify this onion service
446        let service_id = V3OnionServiceId::from_private_key(private_key);
447        let hs_nickname = match HsNickname::new(service_id.to_string()) {
448            Ok(nickname) => nickname,
449            Err(_) => {
450                panic!("v3 onion service id string representation should be a valid HsNickname")
451            }
452        };
453        // generate a new HsIdKeypair (from an Ed25519PrivateKey)
454        // clone() isn't implemented for ExpandedKeypair >:[
455        let secret_key_bytes = private_key.inner().to_secret_key_bytes();
456        let hs_id_keypair = ExpandedKeypair::from_secret_key_bytes(secret_key_bytes).unwrap();
457
458        // create an OnionServiceConfig with the ephemeral nickname
459        let mut onion_service_config_builder = OnionServiceConfigBuilder::default();
460        onion_service_config_builder.nickname(hs_nickname);
461
462        // add authorised client keys if they exist
463        if let Some(authorized_clients) = authorized_clients {
464            if !authorized_clients.is_empty() {
465                let restricted_discovery_config =
466                    onion_service_config_builder.restricted_discovery();
467                restricted_discovery_config.enabled(true);
468
469                for (i, key) in authorized_clients.iter().enumerate() {
470                    let nickname = format!("client_{i}");
471                    restricted_discovery_config.static_keys().access().push((
472                        HsClientNickname::from_str(nickname.as_str()).unwrap(),
473                        (*key.inner()).into(),
474                    ));
475                }
476            }
477        }
478
479        let onion_service_config = match onion_service_config_builder.build() {
480            Ok(onion_service_config) => onion_service_config,
481            Err(err) => Err(Error::OnionServiceConfigBuilderError(err))?,
482        };
483
484        let (onion_service, mut rend_requests) = self
485            .arti_client
486            .launch_onion_service_with_hsid(onion_service_config, hs_id_keypair.into())
487            .map_err(Error::ArtiClientOnionServiceLaunchError)?;
488
489        // start a task to signal onion service published
490        let pending_events = self.pending_events.clone();
491        let mut status_events = onion_service.status_events();
492        let service_id_clone = service_id.clone();
493
494        self.tokio_runtime.spawn(async move {
495            while let Some(evt) = status_events.next().await {
496                if evt.state() == tor_hsservice::status::State::Running {
497                    match pending_events.lock() {
498                        Ok(mut pending_events) => {
499                            pending_events.push(TorEvent::OnionServicePublished {
500                                service_id: service_id_clone,
501                            });
502                            return;
503                        }
504                        Err(_) => unreachable!(
505                            "another thread panicked while holding this pending_events mutex"
506                        ),
507                    }
508                }
509            }
510        });
511
512        // start a task which accepts every RendRequest to get a StreamRequest
513        self.tokio_runtime.spawn(async move {
514            while let Some(request) = rend_requests.next().await {
515                let mut stream_requests = match request.accept().await {
516                    Ok(stream_requests) => stream_requests,
517                    // TODO: probably not our problem?
518                    _ => return,
519                };
520                // spawn a new task to consume the stream requsts
521                tokio::task::spawn(async move {
522                    while let Some(stream_request) = stream_requests.next().await {
523                        let should_accept =
524                            if let IncomingStreamRequest::Begin(begin) = stream_request.request() {
525                                // we only accept connections on the virt port
526                                begin.port() == virt_port
527                            } else {
528                                false
529                            };
530
531                        if should_accept {
532                            let data_stream =
533                                match stream_request.accept(Connected::new_empty()).await {
534                                    Ok(data_stream) => data_stream,
535                                    // TODO: probably not our problem
536                                    _ => continue,
537                                };
538                            let (data_reader, data_writer) = data_stream.split();
539
540                            let (tcp_reader, tcp_writer) =
541                                match TcpStream::connect(socket_addr).await {
542                                    Ok(tcp_stream) => tcp_stream.into_split(),
543                                    // TODO: possibly our problem?
544                                    _ => continue,
545                                };
546                            // now spawn new tasks to forward traffic to/from the onion listener
547
548                            let pump_alive = Arc::new(AtomicBool::new(true));
549                            // read from connected client and write to local socket
550                            tokio::task::spawn({
551                                let pump_alive = pump_alive.clone();
552                                async move {
553                                    forward_stream(pump_alive, data_reader, tcp_writer).await;
554                                }
555                            });
556                            // read from local socket and write to connected client
557                            tokio::task::spawn(async move {
558                                forward_stream(pump_alive, tcp_reader, data_writer).await;
559                            });
560                        } else {
561                            // either requesting the wrong port or the wrong type of stream request
562                            let _ = stream_request.shutdown_circuit();
563                        }
564                    }
565                });
566            }
567        });
568
569        let onion_addr = OnionAddr::V3(OnionAddrV3::new(service_id, virt_port));
570        // onion-service is torn down when `onion_service` is dropped
571        Ok(OnionListener::new::<Arc<RunningOnionService>>(
572            listener,
573            onion_addr,
574            onion_service,
575            |_| {},
576        ))
577    }
578
579    fn generate_token(&mut self) -> CircuitToken {
580        0usize
581    }
582
583    fn release_token(&mut self, _token: CircuitToken) {}
584}