Skip to main content

tor_ptmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48
49pub mod config;
50pub mod err;
51
52#[cfg(feature = "managed-pts")]
53pub mod ipc;
54
55#[cfg(feature = "managed-pts")]
56mod managed;
57
58use crate::config::{TransportConfig, TransportOptions};
59use crate::err::PtError;
60use oneshot_fused_workaround as oneshot;
61use std::collections::HashMap;
62use std::net::SocketAddr;
63use std::path::PathBuf;
64use std::sync::{Arc, RwLock};
65use tor_config_path::CfgPathResolver;
66use tor_linkspec::PtTransportName;
67use tor_rtcompat::Runtime;
68use tor_socksproto::SocksVersion;
69#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
70use tracing::info;
71use tracing::warn;
72#[cfg(feature = "managed-pts")]
73use {
74    crate::managed::{PtReactor, PtReactorMessage},
75    futures::channel::mpsc::{self, UnboundedSender},
76    tor_error::error_report,
77    tor_rtcompat::SpawnExt,
78};
79#[cfg(feature = "tor-channel-factory")]
80use {
81    async_trait::async_trait,
82    tor_chanmgr::{
83        builder::ChanBuilder,
84        factory::{AbstractPtError, ChannelFactory},
85        transport::ExternalProxyPlugin,
86    },
87    tracing::trace,
88};
89
90/// Shared mutable state between the `PtReactor` and `PtMgr`.
91#[derive(Default, Debug)]
92struct PtSharedState {
93    /// Connection information for pluggable transports from currently running binaries.
94    ///
95    /// Unmanaged pluggable transports are not included in this map.
96    #[allow(dead_code)]
97    managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
98    /// Current configured set of pluggable transports.
99    configured: HashMap<PtTransportName, TransportOptions>,
100}
101
102/// A pluggable transport manager knows how to make different
103/// kinds of connections to the Tor network, for censorship avoidance.
104pub struct PtMgr<R> {
105    /// An underlying `Runtime`, used to spawn background tasks.
106    #[allow(dead_code)]
107    runtime: R,
108    /// State for this `PtMgr` that's shared with the `PtReactor`.
109    state: Arc<RwLock<PtSharedState>>,
110    /// PtReactor channel when the `managed-pts` feature is enabled.
111    #[cfg(feature = "managed-pts")]
112    tx: UnboundedSender<PtReactorMessage>,
113}
114
115impl<R: Runtime> PtMgr<R> {
116    /// Transform the config into a more useful representation indexed by transport name.
117    fn transform_config(
118        binaries: Vec<TransportConfig>,
119    ) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
120        let mut ret = HashMap::new();
121        // FIXME(eta): You can currently specify overlapping protocols, and it'll
122        //             just use the last transport specified.
123        //             I attempted to fix this, but decided I didn't want to stare into the list
124        //             builder macro void after trying it for 15 minutes.
125        for thing in binaries {
126            for tn in thing.protocols.iter() {
127                ret.insert(tn.clone(), thing.clone().try_into()?);
128            }
129        }
130        for opt in ret.values() {
131            if let TransportOptions::Unmanaged(u) = opt {
132                if !u.is_localhost() {
133                    warn!(
134                        "Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only."
135                    );
136                }
137            }
138        }
139        Ok(ret)
140    }
141
142    /// Create a new PtMgr.
143    // TODO: maybe don't have the Vec directly exposed?
144    pub fn new(
145        transports: Vec<TransportConfig>,
146        #[allow(unused)] state_dir: PathBuf,
147        path_resolver: Arc<CfgPathResolver>,
148        rt: R,
149    ) -> Result<Self, PtError> {
150        let state = PtSharedState {
151            managed_cmethods: Default::default(),
152            configured: Self::transform_config(transports)?,
153        };
154        let state = Arc::new(RwLock::new(state));
155
156        // reactor is only needed if we support managed pts
157        #[cfg(feature = "managed-pts")]
158        let tx = {
159            let (tx, rx) = mpsc::unbounded();
160
161            let mut reactor =
162                PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
163            rt.spawn(async move {
164                loop {
165                    match reactor.run_one_step().await {
166                        Ok(true) => return,
167                        Ok(false) => {}
168                        Err(e) => {
169                            error_report!(e, "PtReactor failed");
170                            return;
171                        }
172                    }
173                }
174            })
175            .map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
176
177            tx
178        };
179
180        Ok(Self {
181            runtime: rt,
182            state,
183            #[cfg(feature = "managed-pts")]
184            tx,
185        })
186    }
187
188    /// Reload the configuration
189    pub fn reconfigure(
190        &self,
191        how: tor_config::Reconfigure,
192        transports: Vec<TransportConfig>,
193    ) -> Result<(), tor_config::ReconfigureError> {
194        let configured = Self::transform_config(transports)?;
195        if how == tor_config::Reconfigure::CheckAllOrNothing {
196            return Ok(());
197        }
198        {
199            let mut inner = self.state.write().expect("ptmgr poisoned");
200            inner.configured = configured;
201        }
202        // We don't have any way of propagating this sanely; the caller will find out the reactor
203        // has died later on anyway.
204        #[cfg(feature = "managed-pts")]
205        let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
206        Ok(())
207    }
208
209    /// Given a transport name, return a method that we can use to contact that transport.
210    ///
211    /// May have to launch a managed transport as needed.
212    ///
213    /// Returns Ok(None) if no such transport exists.
214    #[cfg(feature = "tor-channel-factory")]
215    async fn get_cmethod_for_transport(
216        &self,
217        transport: &PtTransportName,
218    ) -> Result<Option<PtClientMethod>, PtError> {
219        #[allow(unused)]
220        let (cfg, managed_cmethod) = {
221            // NOTE(eta): This is using a RwLock inside async code (but not across an await point).
222            //            Arguably this is fine since it's just a small read, and nothing should ever
223            //            hold this lock for very long.
224            let inner = self.state.read().expect("ptmgr poisoned");
225            let cfg = inner.configured.get(transport);
226            let managed_cmethod = inner.managed_cmethods.get(transport);
227            (cfg.cloned(), managed_cmethod.cloned())
228        };
229
230        match cfg {
231            Some(TransportOptions::Unmanaged(cfg)) => {
232                let cmethod = cfg.cmethod();
233                trace!(
234                    "Found configured unmanaged transport {transport} accessible via {cmethod:?}"
235                );
236                Ok(Some(cmethod))
237            }
238            #[cfg(feature = "managed-pts")]
239            Some(TransportOptions::Managed(_cfg)) => {
240                match managed_cmethod {
241                    // A configured-and-running cmethod.
242                    Some(cmethod) => {
243                        trace!(
244                            "Found configured managed transport {transport} accessible via {cmethod:?}"
245                        );
246                        Ok(Some(cmethod))
247                    }
248                    // A configured-but-not-running cmethod.
249                    None => {
250                        // There is going to be a lot happening "under the hood" here.
251                        //
252                        // When we are asked to get a ChannelFactory for a given
253                        // connection, we will need to:
254                        //    - launch the binary for that transport if it is not already running*.
255                        //    - If we launched the binary, talk to it and see which ports it
256                        //      is listening on.
257                        //    - Return a ChannelFactory that connects via one of those ports,
258                        //      using the appropriate version of SOCKS, passing K=V parameters
259                        //      encoded properly.
260                        //
261                        // * As in other managers, we'll need to avoid trying to launch the same
262                        //   transport twice if we get two concurrent requests.
263                        //
264                        // Later if the binary crashes, we should detect that.  We should relaunch
265                        // it on demand.
266                        //
267                        // On reconfigure, we should shut down any no-longer-used transports.
268                        //
269                        // Maybe, we should shut down transports that haven't been used
270                        // for a long time.
271                        Ok(Some(self.spawn_transport(transport).await?))
272                    }
273                }
274            }
275            // No configuration for this transport.
276            None => {
277                trace!("Got a request for transport {transport}, which is not configured.");
278                Ok(None)
279            }
280        }
281    }
282
283    /// Communicate with the PT reactor to launch a managed transport.
284    #[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
285    async fn spawn_transport(
286        &self,
287        transport: &PtTransportName,
288    ) -> Result<PtClientMethod, PtError> {
289        // Tell the reactor to spawn the PT, and wait for it.
290        // (The reactor will handle coalescing multiple requests.)
291        info!(
292            "Got a request for transport {transport}, which is not currently running. Launching it."
293        );
294
295        let (tx, rx) = oneshot::channel();
296        self.tx
297            .unbounded_send(PtReactorMessage::Spawn {
298                pt: transport.clone(),
299                result: tx,
300            })
301            .map_err(|_| {
302                PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
303            })?;
304
305        let method = match rx.await {
306            Err(_) => {
307                return Err(PtError::Internal(tor_error::internal!(
308                    "PT reactor closed unexpectedly"
309                )));
310            }
311            Ok(Err(e)) => {
312                warn!("PT for {transport} failed to launch: {e}");
313                return Err(e);
314            }
315            Ok(Ok(method)) => method,
316        };
317
318        info!("Successfully launched PT for {transport} at {method:?}.");
319        Ok(method)
320    }
321}
322
323/// A SOCKS endpoint to connect through a pluggable transport.
324#[derive(Debug, Clone, PartialEq, Eq)]
325pub struct PtClientMethod {
326    /// The SOCKS protocol version to use.
327    pub(crate) kind: SocksVersion,
328    /// The socket address to connect to.
329    pub(crate) endpoint: SocketAddr,
330}
331
332impl PtClientMethod {
333    /// Get the SOCKS protocol version to use.
334    pub fn kind(&self) -> SocksVersion {
335        self.kind
336    }
337
338    /// Get the socket address to connect to.
339    pub fn endpoint(&self) -> SocketAddr {
340        self.endpoint
341    }
342}
343
344#[cfg(feature = "tor-channel-factory")]
345#[async_trait]
346impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
347    async fn factory_for_transport(
348        &self,
349        transport: &PtTransportName,
350    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
351        let cmethod = match self.get_cmethod_for_transport(transport).await {
352            Err(e) => return Err(Arc::new(e)),
353            Ok(None) => return Ok(None),
354            Ok(Some(m)) => m,
355        };
356
357        let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
358        let factory = ChanBuilder::new_client(self.runtime.clone(), proxy);
359        // FIXME(eta): Should we cache constructed factories? If no: should this still be an Arc?
360        // FIXME(eta): Should we track what transports are live somehow, so we can shut them down?
361        Ok(Some(Arc::new(factory)))
362    }
363}