1use std::net::SocketAddr;
3use std::ops::DerefMut;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9use arti_client::config::{CfgPath, TorClientConfigBuilder};
11use arti_client::{BootstrapBehavior, DangerouslyIntoTorAddr, IntoTorAddr, TorClient};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::runtime;
15use tokio_stream::StreamExt;
16use tor_cell::relaycell::msg::Connected;
17use tor_config::ExplicitOrAuto;
18use tor_hsservice::config::restricted_discovery::HsClientNickname;
19use tor_hsservice::config::OnionServiceConfigBuilder;
20use tor_hsservice::{HsNickname, RunningOnionService};
21use tor_keymgr::{config::ArtiKeystoreKind, KeystoreSelector};
22use tor_llcrypto::pk::ed25519::ExpandedKeypair;
23use tor_proto::stream::IncomingStreamRequest;
24use tor_rtcompat::PreferredRuntime;
25
26use crate::tor_crypto::*;
28use crate::tor_provider;
29use crate::tor_provider::*;
30
31#[derive(thiserror::Error, Debug)]
33pub enum Error {
34 #[error("not implemented")]
35 NotImplemented(),
36
37 #[error("unable to bind TCP listener")]
38 TcpListenerBindFailed(#[source] std::io::Error),
39
40 #[error("unable to get TCP listener's local address")]
41 TcpListenerLocalAddrFailed(#[source] std::io::Error),
42
43 #[error("unable to accept connection on TCP Listener")]
44 TcpListenerAcceptFailed(#[source] std::io::Error),
45
46 #[error("unable to connect to TCP listener")]
47 TcpStreamConnectFailed(#[source] std::io::Error),
48
49 #[error("unable to convert tokio::TcpStream to std::net::TcpStream")]
50 TcpStreamIntoFailed(#[source] std::io::Error),
51
52 #[error("arti-client config-builder error: {0}")]
53 ArtiClientConfigBuilderError(#[source] arti_client::config::ConfigBuildError),
54
55 #[error("arti-client error: {0}")]
56 ArtiClientError(#[source] arti_client::Error),
57
58 #[error("arti-client tor-addr error: {0}")]
59 ArtiClientTorAddrError(#[source] arti_client::TorAddrError),
60
61 #[error("arti-client onion-service startup error: {0}")]
62 ArtiClientOnionServiceLaunchError(#[source] arti_client::Error),
63
64 #[error("tor-keymgr error: {0}")]
65 TorKeyMgrError(#[source] tor_keymgr::Error),
66
67 #[error("onion-service config-builder error: {0}")]
68 OnionServiceConfigBuilderError(#[source] tor_config::ConfigBuildError),
69}
70
71impl From<Error> for crate::tor_provider::Error {
72 fn from(error: Error) -> Self {
73 crate::tor_provider::Error::Generic(error.to_string())
74 }
75}
76
77pub struct ArtiClientTorClient {
81 tokio_runtime: Arc<runtime::Runtime>,
82 arti_client: TorClient<PreferredRuntime>,
83 pending_events: Arc<Mutex<Vec<TorEvent>>>,
84 bootstrapped: Arc<AtomicBool>,
85 next_connect_handle: ConnectHandle,
86}
87
88async fn forward_stream<R, W>(alive: Arc<AtomicBool>, mut reader: R, mut writer: W)
90where
91 R: AsyncReadExt + Unpin,
92 W: AsyncWriteExt + Unpin,
93{
94 let read_timeout = std::time::Duration::from_millis(100);
96 let mut remaining_retries = 3;
99 let mut buf = [0u8; 1024];
100
101 loop {
102 if !alive.load(Ordering::Relaxed) && remaining_retries == 0 {
103 break;
104 }
105
106 tokio::select! {
107 count = reader.read(&mut buf) => match count {
108 Ok(0) => break,
110 Ok(count) => {
112 match writer.write_all(&buf[0..count]).await {
114 Ok(()) => (),
115 Err(_err) => break,
116 }
117 match writer.flush().await {
118 Ok(()) => (),
119 Err(_err) => break,
120 }
121 },
122 Err(_err) => break,
124 },
125 _ = tokio::time::sleep(read_timeout) => match writer.flush().await {
126 Ok(()) => {
127 if !alive.load(Ordering::Relaxed) {
130 remaining_retries -= 1;
131 }
132 },
133 Err(_err) => break,
134 }
135 }
136 }
137 alive.store(false, Ordering::Relaxed);
139}
140
141impl ArtiClientTorClient {
142 pub fn new(
144 tokio_runtime: Arc<runtime::Runtime>,
145 root_data_directory: &Path,
146 ) -> Result<Self, Error> {
147 let mut config_builder: TorClientConfigBuilder = Default::default();
149
150 let mut cache_dir = PathBuf::from(root_data_directory);
153 cache_dir.push("cache");
154 config_builder
155 .storage()
156 .cache_dir(CfgPath::new_literal(cache_dir))
157 .keystore()
158 .primary()
159 .kind(ExplicitOrAuto::Explicit(ArtiKeystoreKind::Ephemeral));
160
161 let mut state_dir = PathBuf::from(root_data_directory);
162 state_dir.push("state");
163 config_builder
164 .storage()
165 .state_dir(CfgPath::new_literal(state_dir));
166
167 config_builder
169 .address_filter()
170 .allow_local_addrs(false)
171 .allow_onion_addrs(true);
172
173 let config = match config_builder.build() {
174 Ok(config) => config,
175 Err(err) => return Err(Error::ArtiClientConfigBuilderError(err)),
176 };
177
178 let arti_client = tokio_runtime.block_on(async {
179 TorClient::builder()
180 .config(config)
181 .bootstrap_behavior(BootstrapBehavior::Manual)
182 .create_unbootstrapped()
183 .map_err(Error::ArtiClientError)
184
185 })?;
188
189 let pending_events = std::vec![TorEvent::LogReceived {
190 line: "Starting arti-client TorProvider".to_string()
191 }];
192 let pending_events = Arc::new(Mutex::new(pending_events));
193
194 Ok(Self {
195 tokio_runtime,
196 arti_client,
197 pending_events,
198 bootstrapped: Arc::new(AtomicBool::new(false)),
199 next_connect_handle: Default::default(),
200 })
201 }
202
203 async fn connect_impl(
204 target_addr: TargetAddr,
205 arti_client: TorClient<PreferredRuntime>,
206 ) -> Result<std::net::TcpStream, tor_provider::Error> {
207 let arti_target = match target_addr.clone() {
209 TargetAddr::Socket(socket_addr) => socket_addr.into_tor_addr_dangerously(),
210 TargetAddr::Domain(domain_addr) => {
211 (domain_addr.domain(), domain_addr.port()).into_tor_addr()
212 }
213 TargetAddr::OnionService(OnionAddr::V3(OnionAddrV3 {
214 service_id,
215 virt_port,
216 })) => (format!("{}.onion", service_id), virt_port).into_tor_addr(),
217 }
218 .map_err(Error::ArtiClientTorAddrError)?;
219
220 let data_stream = arti_client
222 .connect(arti_target)
223 .await
224 .map_err(Error::ArtiClientError)?;
225
226 let (data_reader, data_writer) = data_stream.split();
229
230 let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
232 let server_listener = TcpListener::bind(socket_addr)
233 .await
234 .map_err(Error::TcpListenerBindFailed)?;
235 let server_accept_future = server_listener.accept();
237 let socket_addr = server_listener
238 .local_addr()
239 .map_err(Error::TcpListenerLocalAddrFailed)?;
240
241 let client_stream = TcpStream::connect(socket_addr)
243 .await
244 .map_err(Error::TcpStreamConnectFailed)?;
245 let (server_stream, _socket_addr) = server_accept_future
247 .await
248 .map_err(Error::TcpListenerAcceptFailed)?;
249 let (tcp_reader, tcp_writer) = server_stream.into_split();
250
251 let pump_alive = Arc::new(AtomicBool::new(true));
253 tokio::task::spawn({
254 let pump_alive = pump_alive.clone();
255 async move {
256 forward_stream(pump_alive, tcp_reader, data_writer).await;
257 }
258 });
259 tokio::task::spawn(async move {
260 forward_stream(pump_alive, data_reader, tcp_writer).await;
261 });
262 let client_stream = client_stream
263 .into_std()
264 .map_err(Error::TcpStreamIntoFailed)?;
265 Ok::<std::net::TcpStream, tor_provider::Error>(client_stream)
266 }
267}
268
269impl TorProvider for ArtiClientTorClient {
270 fn update(&mut self) -> Result<Vec<TorEvent>, tor_provider::Error> {
271 std::thread::sleep(std::time::Duration::from_millis(16));
272 match self.pending_events.lock() {
273 Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())),
274 Err(_) => {
275 unreachable!("another thread panicked while holding this pending_events mutex")
276 }
277 }
278 }
279
280 fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {
281 let mut bootstrap_events = self.arti_client.bootstrap_events();
283 let pending_events = self.pending_events.clone();
284 let bootstrapped = self.bootstrapped.clone();
285 self.tokio_runtime.spawn(async move {
286 while let Some(evt) = bootstrap_events.next().await {
287 if bootstrapped.load(Ordering::Relaxed) {
288 break;
289 }
290 match pending_events.lock() {
291 Ok(mut pending_events) => {
292 pending_events.push(TorEvent::BootstrapStatus {
293 progress: (evt.as_frac().clamp(0.0f32, 1.0f32) * 100f32) as u32,
294 tag: "no-tag".to_string(),
295 summary: "no summary".to_string(),
296 });
297 }
299 Err(_) => unreachable!(
300 "another thread panicked while holding this pending_events mutex"
301 ),
302 }
303 }
304 });
305
306 let arti_client = self.arti_client.clone();
308 let pending_events = self.pending_events.clone();
309 let bootstrapped = self.bootstrapped.clone();
310 self.tokio_runtime.spawn(async move {
311 match arti_client.bootstrap().await {
312 Ok(()) => match pending_events.lock() {
313 Ok(mut pending_events) => {
314 pending_events.push(TorEvent::BootstrapStatus {
315 progress: 100,
316 tag: "no-tag".to_string(),
317 summary: "no summary".to_string(),
318 });
319 pending_events.push(TorEvent::BootstrapComplete);
320 bootstrapped.store(true, Ordering::Relaxed);
321 }
322 Err(_) => unreachable!(
323 "another thread panicked while holding this pending_events mutex"
324 ),
325 },
326 Err(_err) => {
327 }
329 }
330 });
331
332 Ok(())
333 }
334
335 fn add_client_auth(
336 &mut self,
337 service_id: &V3OnionServiceId,
338 client_auth: &X25519PrivateKey,
339 ) -> Result<(), tor_provider::Error> {
340 let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
341 let hs_id = *ed25519_public.as_bytes();
342
343 self.arti_client
344 .insert_service_discovery_key(
345 KeystoreSelector::Primary,
346 hs_id.into(),
347 client_auth.inner().clone().into(),
348 )
349 .map_err(Error::ArtiClientError)?;
350
351 Ok(())
352 }
353
354 fn remove_client_auth(
355 &mut self,
356 service_id: &V3OnionServiceId,
357 ) -> Result<(), tor_provider::Error> {
358 let ed25519_public = Ed25519PublicKey::from_service_id(service_id).unwrap();
359 let hs_id = *ed25519_public.as_bytes();
360
361 self.arti_client
362 .remove_service_discovery_key(KeystoreSelector::Primary, hs_id.into())
363 .map_err(Error::ArtiClientError)?;
364
365 Ok(())
366 }
367
368 fn connect(
369 &mut self,
370 target: TargetAddr,
371 circuit: Option<CircuitToken>,
372 ) -> Result<OnionStream, tor_provider::Error> {
373 if circuit.is_some() {
375 return Err(Error::NotImplemented().into());
376 }
377
378 let arti_client = self.arti_client.clone();
379 let stream = self.tokio_runtime.block_on({
380 let target = target.clone();
381 async move { Self::connect_impl(target, arti_client).await }
382 })?;
383 Ok(OnionStream {
384 stream,
385 local_addr: None,
386 peer_addr: Some(target),
387 })
388 }
389
390 fn connect_async(
391 &mut self,
392 target: TargetAddr,
393 circuit: Option<CircuitToken>,
394 ) -> Result<ConnectHandle, tor_provider::Error> {
395 if circuit.is_some() {
397 return Err(Error::NotImplemented().into());
398 }
399
400 let handle = self.next_connect_handle;
401 self.next_connect_handle += 1usize;
402
403 let arti_client = self.arti_client.clone();
404 let pending_events = Arc::downgrade(&self.pending_events);
405
406 self.tokio_runtime.spawn(async move {
407 let stream = Self::connect_impl(target.clone(), arti_client).await;
408 if let Some(pending_events) = pending_events.upgrade() {
409 let event = match stream {
410 Ok(stream) => {
411 let stream = OnionStream {
412 stream,
413 local_addr: None,
414 peer_addr: Some(target),
415 };
416 TorEvent::ConnectComplete { handle, stream }
417 }
418 Err(error) => TorEvent::ConnectFailed { handle, error },
419 };
420 let mut pending_events = pending_events
421 .lock()
422 .expect("pending_events mutex poisoned");
423 pending_events.push(event);
424 }
425 });
426
427 Ok(handle)
428 }
429
430 fn listener(
431 &mut self,
432 private_key: &Ed25519PrivateKey,
433 virt_port: u16,
434 authorized_clients: Option<&[X25519PublicKey]>,
435 ) -> Result<OnionListener, tor_provider::Error> {
436 let socket_addr = SocketAddr::from(([127, 0, 0, 1], 0u16));
438 let listener =
440 std::net::TcpListener::bind(socket_addr).map_err(Error::TcpListenerBindFailed)?;
441 let socket_addr = listener
442 .local_addr()
443 .map_err(Error::TcpListenerLocalAddrFailed)?;
444
445 let service_id = V3OnionServiceId::from_private_key(private_key);
447 let hs_nickname = match HsNickname::new(service_id.to_string()) {
448 Ok(nickname) => nickname,
449 Err(_) => {
450 panic!("v3 onion service id string representation should be a valid HsNickname")
451 }
452 };
453 let secret_key_bytes = private_key.inner().to_secret_key_bytes();
456 let hs_id_keypair = ExpandedKeypair::from_secret_key_bytes(secret_key_bytes).unwrap();
457
458 let mut onion_service_config_builder = OnionServiceConfigBuilder::default();
460 onion_service_config_builder.nickname(hs_nickname);
461
462 if let Some(authorized_clients) = authorized_clients {
464 if !authorized_clients.is_empty() {
465 let restricted_discovery_config =
466 onion_service_config_builder.restricted_discovery();
467 restricted_discovery_config.enabled(true);
468
469 for (i, key) in authorized_clients.iter().enumerate() {
470 let nickname = format!("client_{i}");
471 restricted_discovery_config.static_keys().access().push((
472 HsClientNickname::from_str(nickname.as_str()).unwrap(),
473 (*key.inner()).into(),
474 ));
475 }
476 }
477 }
478
479 let onion_service_config = match onion_service_config_builder.build() {
480 Ok(onion_service_config) => onion_service_config,
481 Err(err) => Err(Error::OnionServiceConfigBuilderError(err))?,
482 };
483
484 let (onion_service, mut rend_requests) = self
485 .arti_client
486 .launch_onion_service_with_hsid(onion_service_config, hs_id_keypair.into())
487 .map_err(Error::ArtiClientOnionServiceLaunchError)?;
488
489 let pending_events = self.pending_events.clone();
491 let mut status_events = onion_service.status_events();
492 let service_id_clone = service_id.clone();
493
494 self.tokio_runtime.spawn(async move {
495 while let Some(evt) = status_events.next().await {
496 if evt.state() == tor_hsservice::status::State::Running {
497 match pending_events.lock() {
498 Ok(mut pending_events) => {
499 pending_events.push(TorEvent::OnionServicePublished {
500 service_id: service_id_clone,
501 });
502 return;
503 }
504 Err(_) => unreachable!(
505 "another thread panicked while holding this pending_events mutex"
506 ),
507 }
508 }
509 }
510 });
511
512 self.tokio_runtime.spawn(async move {
514 while let Some(request) = rend_requests.next().await {
515 let mut stream_requests = match request.accept().await {
516 Ok(stream_requests) => stream_requests,
517 _ => return,
519 };
520 tokio::task::spawn(async move {
522 while let Some(stream_request) = stream_requests.next().await {
523 let should_accept =
524 if let IncomingStreamRequest::Begin(begin) = stream_request.request() {
525 begin.port() == virt_port
527 } else {
528 false
529 };
530
531 if should_accept {
532 let data_stream =
533 match stream_request.accept(Connected::new_empty()).await {
534 Ok(data_stream) => data_stream,
535 _ => continue,
537 };
538 let (data_reader, data_writer) = data_stream.split();
539
540 let (tcp_reader, tcp_writer) =
541 match TcpStream::connect(socket_addr).await {
542 Ok(tcp_stream) => tcp_stream.into_split(),
543 _ => continue,
545 };
546 let pump_alive = Arc::new(AtomicBool::new(true));
549 tokio::task::spawn({
551 let pump_alive = pump_alive.clone();
552 async move {
553 forward_stream(pump_alive, data_reader, tcp_writer).await;
554 }
555 });
556 tokio::task::spawn(async move {
558 forward_stream(pump_alive, tcp_reader, data_writer).await;
559 });
560 } else {
561 let _ = stream_request.shutdown_circuit();
563 }
564 }
565 });
566 }
567 });
568
569 let onion_addr = OnionAddr::V3(OnionAddrV3::new(service_id, virt_port));
570 Ok(OnionListener::new::<Arc<RunningOnionService>>(
572 listener,
573 onion_addr,
574 onion_service,
575 |_| {},
576 ))
577 }
578
579 fn generate_token(&mut self) -> CircuitToken {
580 0usize
581 }
582
583 fn release_token(&mut self, _token: CircuitToken) {}
584}