1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47pub mod config;
50pub mod err;
51
52#[cfg(feature = "managed-pts")]
53pub mod ipc;
54
55#[cfg(feature = "managed-pts")]
56mod managed;
57
58use crate::config::{TransportConfig, TransportOptions};
59use crate::err::PtError;
60use oneshot_fused_workaround as oneshot;
61use std::collections::HashMap;
62use std::net::SocketAddr;
63use std::path::PathBuf;
64use std::sync::{Arc, RwLock};
65use tor_config_path::CfgPathResolver;
66use tor_linkspec::PtTransportName;
67use tor_rtcompat::Runtime;
68use tor_socksproto::SocksVersion;
69#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
70use tracing::info;
71use tracing::warn;
72#[cfg(feature = "managed-pts")]
73use {
74 crate::managed::{PtReactor, PtReactorMessage},
75 futures::channel::mpsc::{self, UnboundedSender},
76 tor_error::error_report,
77 tor_rtcompat::SpawnExt,
78};
79#[cfg(feature = "tor-channel-factory")]
80use {
81 async_trait::async_trait,
82 tor_chanmgr::{
83 builder::ChanBuilder,
84 factory::{AbstractPtError, ChannelFactory},
85 transport::ExternalProxyPlugin,
86 },
87 tracing::trace,
88};
89
90#[derive(Default, Debug)]
92struct PtSharedState {
93 #[allow(dead_code)]
97 managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
98 configured: HashMap<PtTransportName, TransportOptions>,
100}
101
102pub struct PtMgr<R> {
105 #[allow(dead_code)]
107 runtime: R,
108 state: Arc<RwLock<PtSharedState>>,
110 #[cfg(feature = "managed-pts")]
112 tx: UnboundedSender<PtReactorMessage>,
113}
114
115impl<R: Runtime> PtMgr<R> {
116 fn transform_config(
118 binaries: Vec<TransportConfig>,
119 ) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
120 let mut ret = HashMap::new();
121 for thing in binaries {
126 for tn in thing.protocols.iter() {
127 ret.insert(tn.clone(), thing.clone().try_into()?);
128 }
129 }
130 for opt in ret.values() {
131 if let TransportOptions::Unmanaged(u) = opt {
132 if !u.is_localhost() {
133 warn!(
134 "Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only."
135 );
136 }
137 }
138 }
139 Ok(ret)
140 }
141
142 pub fn new(
145 transports: Vec<TransportConfig>,
146 #[allow(unused)] state_dir: PathBuf,
147 path_resolver: Arc<CfgPathResolver>,
148 rt: R,
149 ) -> Result<Self, PtError> {
150 let state = PtSharedState {
151 managed_cmethods: Default::default(),
152 configured: Self::transform_config(transports)?,
153 };
154 let state = Arc::new(RwLock::new(state));
155
156 #[cfg(feature = "managed-pts")]
158 let tx = {
159 let (tx, rx) = mpsc::unbounded();
160
161 let mut reactor =
162 PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
163 rt.spawn(async move {
164 loop {
165 match reactor.run_one_step().await {
166 Ok(true) => return,
167 Ok(false) => {}
168 Err(e) => {
169 error_report!(e, "PtReactor failed");
170 return;
171 }
172 }
173 }
174 })
175 .map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
176
177 tx
178 };
179
180 Ok(Self {
181 runtime: rt,
182 state,
183 #[cfg(feature = "managed-pts")]
184 tx,
185 })
186 }
187
188 pub fn reconfigure(
190 &self,
191 how: tor_config::Reconfigure,
192 transports: Vec<TransportConfig>,
193 ) -> Result<(), tor_config::ReconfigureError> {
194 let configured = Self::transform_config(transports)?;
195 if how == tor_config::Reconfigure::CheckAllOrNothing {
196 return Ok(());
197 }
198 {
199 let mut inner = self.state.write().expect("ptmgr poisoned");
200 inner.configured = configured;
201 }
202 #[cfg(feature = "managed-pts")]
205 let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
206 Ok(())
207 }
208
209 #[cfg(feature = "tor-channel-factory")]
215 async fn get_cmethod_for_transport(
216 &self,
217 transport: &PtTransportName,
218 ) -> Result<Option<PtClientMethod>, PtError> {
219 #[allow(unused)]
220 let (cfg, managed_cmethod) = {
221 let inner = self.state.read().expect("ptmgr poisoned");
225 let cfg = inner.configured.get(transport);
226 let managed_cmethod = inner.managed_cmethods.get(transport);
227 (cfg.cloned(), managed_cmethod.cloned())
228 };
229
230 match cfg {
231 Some(TransportOptions::Unmanaged(cfg)) => {
232 let cmethod = cfg.cmethod();
233 trace!(
234 "Found configured unmanaged transport {transport} accessible via {cmethod:?}"
235 );
236 Ok(Some(cmethod))
237 }
238 #[cfg(feature = "managed-pts")]
239 Some(TransportOptions::Managed(_cfg)) => {
240 match managed_cmethod {
241 Some(cmethod) => {
243 trace!(
244 "Found configured managed transport {transport} accessible via {cmethod:?}"
245 );
246 Ok(Some(cmethod))
247 }
248 None => {
250 Ok(Some(self.spawn_transport(transport).await?))
272 }
273 }
274 }
275 None => {
277 trace!("Got a request for transport {transport}, which is not configured.");
278 Ok(None)
279 }
280 }
281 }
282
283 #[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
285 async fn spawn_transport(
286 &self,
287 transport: &PtTransportName,
288 ) -> Result<PtClientMethod, PtError> {
289 info!(
292 "Got a request for transport {transport}, which is not currently running. Launching it."
293 );
294
295 let (tx, rx) = oneshot::channel();
296 self.tx
297 .unbounded_send(PtReactorMessage::Spawn {
298 pt: transport.clone(),
299 result: tx,
300 })
301 .map_err(|_| {
302 PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
303 })?;
304
305 let method = match rx.await {
306 Err(_) => {
307 return Err(PtError::Internal(tor_error::internal!(
308 "PT reactor closed unexpectedly"
309 )));
310 }
311 Ok(Err(e)) => {
312 warn!("PT for {transport} failed to launch: {e}");
313 return Err(e);
314 }
315 Ok(Ok(method)) => method,
316 };
317
318 info!("Successfully launched PT for {transport} at {method:?}.");
319 Ok(method)
320 }
321}
322
323#[derive(Debug, Clone, PartialEq, Eq)]
325pub struct PtClientMethod {
326 pub(crate) kind: SocksVersion,
328 pub(crate) endpoint: SocketAddr,
330}
331
332impl PtClientMethod {
333 pub fn kind(&self) -> SocksVersion {
335 self.kind
336 }
337
338 pub fn endpoint(&self) -> SocketAddr {
340 self.endpoint
341 }
342}
343
344#[cfg(feature = "tor-channel-factory")]
345#[async_trait]
346impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
347 async fn factory_for_transport(
348 &self,
349 transport: &PtTransportName,
350 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
351 let cmethod = match self.get_cmethod_for_transport(transport).await {
352 Err(e) => return Err(Arc::new(e)),
353 Ok(None) => return Ok(None),
354 Ok(Some(m)) => m,
355 };
356
357 let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
358 let factory = ChanBuilder::new_client(self.runtime.clone(), proxy);
359 Ok(Some(Arc::new(factory)))
362 }
363}