Skip to main content

tor_proto/relay/channel/
create_handler.rs

1//! Handler for CREATE* cells.
2
3use crate::FlowCtrlParameters;
4use crate::ccparams::{
5    Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6    FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7};
8use crate::channel::Channel;
9use crate::circuit::CircuitRxSender;
10use crate::circuit::UniqId;
11use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13use crate::client::circuit::CircParameters;
14use crate::client::circuit::padding::PaddingController;
15use crate::crypto::binding::CircuitBinding;
16use crate::crypto::cell::CryptInit as _;
17use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, RelayLayer, tor1};
18use crate::crypto::handshake::RelayHandshakeError;
19use crate::crypto::handshake::ServerHandshake as _;
20use crate::crypto::handshake::fast::CreateFastServer;
21use crate::crypto::handshake::ntor::{NtorSecretKey, NtorServer};
22use crate::memquota::SpecificAccount as _;
23use crate::memquota::{ChannelAccount, CircuitAccount};
24use crate::relay::channel_provider::ChannelProvider;
25use crate::relay::reactor::Reactor;
26use crate::relay::{IncomingStreamRequestFilter, RelayCirc};
27use smallvec::SmallVec;
28use std::sync::{Arc, RwLock, Weak};
29use tor_cell::chancell::ChanMsg as _;
30use tor_cell::chancell::CircId;
31use tor_cell::chancell::msg::{
32    CreateFast, Created2, CreatedFast, Destroy, DestroyReason, HandshakeType,
33};
34use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
35use tor_linkspec::OwnedChanTarget;
36use tor_llcrypto::cipher::aes::Aes128Ctr;
37use tor_llcrypto::d::Sha1;
38use tor_llcrypto::pk::ed25519::Ed25519Identity;
39use tor_llcrypto::pk::rsa::RsaIdentity;
40use tor_memquota::mq_queue::ChannelSpec as _;
41use tor_memquota::mq_queue::MpscSpec;
42use tor_relay_crypto::pk::{RelayNtorKeypair, RelayNtorKeys};
43use tor_rtcompat::SpawnExt as _;
44use tor_rtcompat::{DynTimeProvider, Runtime};
45use tracing::warn;
46
47/// Everything needed to handle CREATE* messages on channels.
48#[derive(derive_more::Debug)]
49pub struct CreateRequestHandler {
50    /// Something that can launch channels. Typically the `ChanMgr`.
51    chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
52    /// Circuit-related network parameters.
53    circ_net_params: RwLock<CircNetParameters>,
54    /// The circuit extension keys.
55    #[debug(skip)]
56    ntor_keys: RwLock<RelayNtorKeys>,
57    /// An [`IncomingStreamRequestFilter`] factory for checking whether the user wants
58    /// this request, or wants to reject it immediately.
59    ///
60    /// Used for obtaining a current [`IncomingStreamRequestFilter`]
61    /// for building a circuit reactor.
62    //
63    // TODO(relay): it's likely this will end up changing quite a bit once we start
64    // figuring out exactly how the config/reconfigure() logic and IncomingStreamRequestFilter
65    // should function for relays.
66    #[debug(skip)]
67    incoming_filter_factory: Box<dyn IncomingStreamRequestFilterFactory + Send + Sync>,
68}
69
70impl CreateRequestHandler {
71    /// Build a new [`CreateRequestHandler`].
72    pub fn new(
73        chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
74        circ_net_params: CircNetParameters,
75        ntor_keys: RelayNtorKeys,
76        incoming_filter_factory: Box<dyn IncomingStreamRequestFilterFactory + Send + Sync>,
77    ) -> Self {
78        Self {
79            chan_provider,
80            circ_net_params: RwLock::new(circ_net_params),
81            ntor_keys: RwLock::new(ntor_keys),
82            incoming_filter_factory,
83        }
84    }
85
86    /// Update the circuit parameters from a network consensus.
87    pub fn update_params(&self, circ_net_params: CircNetParameters) {
88        *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
89    }
90
91    /// Update the handler with a new set of circuit extension keys.
92    ///
93    /// This is called periodically by the relay key rotation task.
94    pub fn update_ntor_keys(&self, ntor_keys: RelayNtorKeys) {
95        *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
96    }
97
98    /// Handle a CREATE* cell.
99    ///
100    /// This intentionally does not return a [`crate::Error`] so that we don't accidentally shut
101    /// down the channel reactor when we really should be returning a DESTROY. Shutting down a
102    /// channel may cause us to leak information about paths of circuits travelling through this
103    /// relay. This is especially important here since we're handling data that is controllable from
104    /// the other end of the circuit.
105    #[allow(clippy::too_many_arguments)]
106    pub(crate) fn handle_create<R: Runtime>(
107        &self,
108        runtime: &R,
109        channel: &Arc<Channel>,
110        our_ed25519_id: &Ed25519Identity,
111        our_rsa_id: &RsaIdentity,
112        circ_id: CircId,
113        msg: &CreateRequest,
114        memquota: &ChannelAccount,
115        circ_unique_id: UniqId,
116    ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
117        let result = self.handle_create_inner(
118            runtime,
119            channel,
120            our_ed25519_id,
121            our_rsa_id,
122            circ_id,
123            msg,
124            memquota,
125            circ_unique_id,
126        );
127
128        match result {
129            Ok(x) => Ok(x),
130            Err(e) => {
131                // TODO(relay): The log messages throughout could be very noisy, so should have rate limiting.
132                let cmd = msg.cmd();
133                debug_report!(&e, %cmd, "Failed to handle circuit create request");
134
135                // `tor-spec/tearing-down-circuits.md`:
136                //
137                // > Implementations SHOULD always use the NONE reason to avoid side channels: [...]
138                Err(Destroy::new(DestroyReason::NONE))
139            }
140        }
141    }
142
143    /// See [`Self::handle_create`].
144    #[allow(clippy::too_many_arguments)]
145    fn handle_create_inner<R: Runtime>(
146        &self,
147        runtime: &R,
148        channel: &Arc<Channel>,
149        our_ed25519_id: &Ed25519Identity,
150        our_rsa_id: &RsaIdentity,
151        circ_id: CircId,
152        msg: &CreateRequest,
153        memquota: &ChannelAccount,
154        circ_unique_id: UniqId,
155    ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
156        // Perform the handshake crypto and build the response.
157        let handshake_components = match msg {
158            CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
159            CreateRequest::Create2(msg) => match msg.handshake_type() {
160                HandshakeType::NTOR_V3 => self.handle_create2_ntorv3(msg.body(), our_ed25519_id)?,
161                HandshakeType::NTOR => self.handle_create2_ntor(msg.body(), our_rsa_id)?,
162                x @ HandshakeType::TAP | x => {
163                    return Err(HandleCreateError::Create2HandshakeType(x));
164                }
165            },
166        };
167
168        let memquota = CircuitAccount::new(memquota)?;
169
170        // We use a large mpsc queue here since a circuit should never block the channel,
171        // and we hope that memquota will help us if an attacker intentionally fills this buffer.
172        // We use `10_000_000` since `usize::MAX` causes `futures::channel::mpsc` to panic.
173        // TODO(relay): We should switch to an unbounded queue, but the circuit reactor is expecting
174        // a bounded queue.
175        let time_provider = DynTimeProvider::new(runtime.clone());
176        let account = memquota.as_raw_account();
177        let (sender, receiver) =
178            MpscSpec::new(10_000_000).new_mq(time_provider.clone(), account)?;
179        let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver);
180
181        // TODO(relay): Do we really want a client padding machine here?
182        let (padding_ctrl, padding_stream) =
183            crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
184
185        // Upgrade the channel provider, which in practice is the `ChanMgr` so this should not fail.
186        let Some(chan_provider) = self.chan_provider.upgrade() else {
187            return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
188        };
189
190        // Create an IncomingStreamRequestFilter for this circuit.
191        // This will get applied to every stream request (BEGIN, BEGIN_DIR, RESOLVE)
192        // arriving on the circuit.
193        //
194        // Note: once built, a circuit reactor's IncomingStreamRequestFilter cannot be changed
195        // (it's fixed for the entire duration of the circuit).
196        let incoming_filter = self.incoming_filter_factory.current_filter();
197
198        // Build the relay circuit reactor.
199        let (reactor, circ, _incoming_streams) = Reactor::new(
200            runtime.clone(),
201            channel,
202            circ_id,
203            circ_unique_id,
204            receiver,
205            handshake_components.crypto_in,
206            handshake_components.crypto_out,
207            &handshake_components.hop_settings,
208            chan_provider,
209            padding_ctrl.clone(),
210            padding_stream,
211            incoming_filter,
212            &memquota,
213        )
214        .map_err(into_internal!("Failed to start circuit reactor"))?;
215
216        // TODO(relay): send the incoming_streams stream to the handler in arti-relay
217
218        // Start the reactor in a task.
219        let () = runtime.spawn(async {
220            match reactor.run().await {
221                Ok(()) => {}
222                Err(e) => {
223                    debug_report!(e, "Relay circuit reactor exited with an error");
224                }
225            }
226        })?;
227
228        Ok((
229            handshake_components.response,
230            RelayCircComponents {
231                circ,
232                sender,
233                padding_ctrl,
234            },
235        ))
236    }
237
238    /// The handshake code for a CREATE_FAST request.
239    fn handle_create_fast(
240        &self,
241        msg: &CreateFast,
242    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
243        // TODO(relay): We might want to offload this to a CPU worker in the future.
244        let (keygen, handshake_msg) = CreateFastServer::server(
245            &mut rand::rng(),
246            // The CREATE_FAST handshake doesn't accept or return extensions,
247            // so this `AuxDataReply` is a no-op.
248            &mut |_: &()| Some(()),
249            // The CREATE_FAST handshake doesn't use any keys.
250            &[()],
251            msg.handshake(),
252        )?;
253
254        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
255            .map_err(into_internal!("Circuit crypt state construction failed"))?;
256
257        let circ_params = self
258            .circ_net_params
259            .read()
260            .expect("rwlock poisoned")
261            // CREATE_FAST always uses fixed-window flow control.
262            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
263
264        // TODO(relay): I don't think that this is the right way to do this. It works for
265        // CREATE_FAST, but we might want to rethink it for CREATE2.
266        let protos = tor_protover::Protocols::default();
267        let hop_settings =
268            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
269                .map_err(into_internal!("Unable to build `HopSettings`"))?;
270
271        let response = CreatedFast::new(handshake_msg);
272        let response = CreateResponse::CreatedFast(response);
273
274        let (crypto_out, crypto_in, _binding) = split_relay_layer(crypt);
275
276        Ok(CompletedHandshakeComponents {
277            response,
278            hop_settings,
279            crypto_out,
280            crypto_in,
281        })
282    }
283
284    /// The handshake code for a CREATE2 ntor (non-v3) request.
285    fn handle_create2_ntor(
286        &self,
287        msg_body: &[u8],
288        our_rsa_id: &RsaIdentity,
289    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
290        let ntor_keys = self.ntor_keys(|k| {
291            NtorSecretKey::new(k.secret().clone(), *k.public().inner(), *our_rsa_id)
292        });
293
294        // TODO(relay): We might want to offload this to a CPU worker in the future.
295        let (keygen, handshake_msg) = NtorServer::server(
296            &mut rand::rng(),
297            // The ntor (non-v3) handshake doesn't accept or return extensions,
298            // so this `AuxDataReply` is a no-op.
299            &mut |_: &()| Some(()),
300            ntor_keys.as_ref(),
301            msg_body,
302        )?;
303
304        let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
305            .map_err(into_internal!("Circuit crypt state construction failed"))?;
306
307        let (crypto_out, crypto_in, _binding) = split_relay_layer(crypt);
308
309        let circ_params = self
310            .circ_net_params
311            .read()
312            .expect("rwlock poisoned")
313            // CREATE2 with ntor (non-v3) always uses fixed-window flow control.
314            .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
315
316        // TODO(relay): I don't think that this is the right way to do this. It works for
317        // ntor, but won't work well for ntor-v3.
318        let protos = tor_protover::Protocols::default();
319        let hop_settings =
320            HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
321                .map_err(into_internal!("Unable to build `HopSettings`"))?;
322
323        let response = Created2::new(handshake_msg);
324        let response = CreateResponse::Created2(response);
325
326        Ok(CompletedHandshakeComponents {
327            response,
328            hop_settings,
329            crypto_out,
330            crypto_in,
331        })
332    }
333
334    /// The handshake code for a CREATE2 ntor-v3 request.
335    fn handle_create2_ntorv3(
336        &self,
337        _msg_body: &[u8],
338        _our_ed25519_id: &Ed25519Identity,
339    ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
340        Err(HandleCreateError::Create2HandshakeType(
341            HandshakeType::NTOR_V3,
342        ))
343    }
344
345    /// Helper to get the ntor keypairs after some transformation `map`.
346    ///
347    /// The `map` transformation must be fast since it blocks a read lock.
348    /// The returned keys are sorted with the most recent key first.
349    ///
350    /// It would be nice if this just returned an iterator,
351    /// but the read lock prevents this.
352    fn ntor_keys<T>(&self, map: impl FnMut(&RelayNtorKeypair) -> T) -> impl AsRef<[T]> {
353        let ntor_keys = self.ntor_keys.read().expect("rwlock poisoned");
354        let ntor_keys = [Some(ntor_keys.latest()), ntor_keys.previous()];
355        ntor_keys
356            .into_iter()
357            .flatten()
358            .map(map)
359            .collect::<SmallVec<[T; 2]>>()
360    }
361}
362
363/// Helper function to split a `RelayLayer` into forward and backward type-erased trait objects.
364fn split_relay_layer<F, B>(
365    crypt: impl RelayLayer<F, B>,
366) -> (
367    Box<dyn OutboundRelayLayer + Send>,
368    Box<dyn InboundRelayLayer + Send>,
369    CircuitBinding,
370)
371where
372    F: OutboundRelayLayer + Send + 'static,
373    B: InboundRelayLayer + Send + 'static,
374{
375    let (crypto_out, crypto_in, binding) = crypt.split_relay_layer();
376    let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
377
378    (crypto_out, crypto_in, binding)
379}
380
381/// An error that occurred while handling a CREATE* request.
382#[derive(Debug, thiserror::Error)]
383enum HandleCreateError {
384    /// Circuit relay handshake failed.
385    #[error("Circuit relay handshake failed")]
386    Handshake(#[from] RelayHandshakeError),
387    /// The requested handshake type is unsupported.
388    #[error("Unsupported handshake type {0}")]
389    Create2HandshakeType(HandshakeType),
390    /// A memquota error.
391    #[error("Memquota error")]
392    Memquota(#[from] tor_memquota::Error),
393    /// Error when spawning a task.
394    #[error("Runtime task spawn error")]
395    Spawn(#[from] futures::task::SpawnError),
396    /// An internal error.
397    ///
398    /// Note that other variants (such as `Handshake` containing a [`RelayHandshakeError`])
399    /// may themselves contain internal errors.
400    #[error("Internal error")]
401    Internal(#[from] tor_error::Bug),
402}
403
404impl HasKind for HandleCreateError {
405    fn kind(&self) -> ErrorKind {
406        match self {
407            Self::Handshake(e) => e.kind(),
408            Self::Create2HandshakeType(_) => ErrorKind::NotImplemented,
409            Self::Memquota(e) => e.kind(),
410            Self::Spawn(e) => e.kind(),
411            Self::Internal(_) => ErrorKind::Internal,
412        }
413    }
414}
415
416/// The components of a completed CREATE* handshake.
417struct CompletedHandshakeComponents {
418    /// The message to send in response.
419    response: CreateResponse,
420    /// The negotiated hop settings.
421    hop_settings: HopSettings,
422    /// Outbound onion crypto.
423    crypto_out: Box<dyn OutboundRelayLayer + Send>,
424    /// Inbound onion crypto.
425    crypto_in: Box<dyn InboundRelayLayer + Send>,
426}
427
428/// A collection of objects built for a new relay circuit.
429pub(crate) struct RelayCircComponents {
430    /// The relay circuit handle.
431    pub(crate) circ: Arc<RelayCirc>,
432    /// Used to send data from the channel to the circuit reactor.
433    pub(crate) sender: CircuitRxSender,
434    /// The circuit's padding controller.
435    pub(crate) padding_ctrl: PaddingController,
436}
437
438/// Congestion control network parameters.
439#[derive(Debug, Clone)]
440#[allow(clippy::exhaustive_structs)]
441pub struct CongestionControlNetParams {
442    /// Fixed-window algorithm parameters.
443    pub fixed_window: FixedWindowParams,
444
445    /// Vegas algorithm parameters for exit circuits.
446    // NOTE: In this module we are handling CREATE* cells,
447    // which only happens for non-hs circuits.
448    // So we don't need to store the vegas hs parameters here.
449    pub vegas_exit: VegasParams,
450
451    /// Congestion window parameters.
452    pub cwnd: CongestionWindowParams,
453
454    /// RTT calculation parameters.
455    pub rtt: RoundTripEstimatorParams,
456
457    /// Flow control parameters to use for all streams on this circuit.
458    pub flow_ctrl: FlowCtrlParameters,
459}
460
461impl CongestionControlNetParams {
462    #[cfg(test)]
463    // These have been copied from C-tor.
464    pub(crate) fn defaults_for_tests() -> Self {
465        Self {
466            fixed_window: FixedWindowParams::defaults_for_tests(),
467            vegas_exit: VegasParams::defaults_for_tests(),
468            cwnd: CongestionWindowParams::defaults_for_tests(),
469            rtt: RoundTripEstimatorParams::defaults_for_tests(),
470            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
471        }
472    }
473}
474
475/// Network consensus parameters for handling incoming circuits.
476///
477/// Unlike `CircParameters`,
478/// this is unopinionated and contains all relevant consensus parameters,
479/// which is needed when handling an incoming CREATE* request where the
480/// circuit origin chooses the type/settings
481/// (for example congestion control type) of the circuit.
482#[derive(Debug, Clone)]
483#[allow(clippy::exhaustive_structs)]
484pub struct CircNetParameters {
485    /// Whether we should include ed25519 identities when we send EXTEND2 cells.
486    pub extend_by_ed25519_id: bool,
487
488    /// Congestion control network parameters.
489    pub cc: CongestionControlNetParams,
490}
491
492impl CircNetParameters {
493    /// Convert the [`CircNetParameters`] into a [`CircParameters`].
494    ///
495    /// We expect the circuit creation handshake to know what congestion control algorithm was
496    /// negotiated, and provide that as `algorithm`.
497    //
498    // We disable `unused` warnings at the root of tor-proto,
499    // but it's nice to have here so we re-enable it.
500    #[warn(unused)]
501    fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
502        // Unpack everything to make sure that we aren't missing anything
503        // (otherwise clippy would warn).
504        let Self {
505            extend_by_ed25519_id,
506            cc:
507                CongestionControlNetParams {
508                    fixed_window,
509                    vegas_exit,
510                    cwnd,
511                    rtt,
512                    flow_ctrl,
513                },
514        } = self;
515
516        let algorithm = match algorithm {
517            AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
518            AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
519        };
520
521        // TODO(arti#2442): The builder pattern here seems like a footgun.
522        let cc = CongestionControlParams::builder()
523            .alg(algorithm)
524            .fixed_window_params(*fixed_window)
525            .cwnd_params(*cwnd)
526            .rtt_params(rtt.clone())
527            .build()
528            .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
529
530        Ok(CircParameters::new(
531            *extend_by_ed25519_id,
532            cc,
533            flow_ctrl.clone(),
534        ))
535    }
536}
537
538/// An [`IncomingStreamRequestFilter`] factory for building [`IncomingStreamRequestFilter`]s.
539///
540/// Each time a new circuit is opened, the [`CreateRequestHandler`] calls
541/// [`IncomingStreamRequestFilterFactory::current_filter`] to build
542/// an [`IncomingStreamRequestFilter`] for the circuit.
543pub trait IncomingStreamRequestFilterFactory {
544    /// Return the [`IncomingStreamRequestFilter`] to apply to the incoming stream requests
545    /// arriving on a circuit.
546    fn current_filter(&self) -> Box<dyn IncomingStreamRequestFilter>;
547}
548
549impl<F> IncomingStreamRequestFilterFactory for F
550where
551    F: Fn() -> Box<dyn IncomingStreamRequestFilter>,
552{
553    fn current_filter(&self) -> Box<dyn IncomingStreamRequestFilter> {
554        (self)()
555    }
556}