1use crate::FlowCtrlParameters;
4use crate::ccparams::{
5 Algorithm, AlgorithmDiscriminants, CongestionControlParams, CongestionWindowParams,
6 FixedWindowParams, RoundTripEstimatorParams, VegasParams,
7};
8use crate::channel::Channel;
9use crate::circuit::CircuitRxSender;
10use crate::circuit::UniqId;
11use crate::circuit::celltypes::{CreateRequest, CreateResponse};
12use crate::circuit::circhop::{HopNegotiationType, HopSettings};
13use crate::client::circuit::CircParameters;
14use crate::client::circuit::padding::PaddingController;
15use crate::crypto::binding::CircuitBinding;
16use crate::crypto::cell::CryptInit as _;
17use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer, RelayLayer, tor1};
18use crate::crypto::handshake::RelayHandshakeError;
19use crate::crypto::handshake::ServerHandshake as _;
20use crate::crypto::handshake::fast::CreateFastServer;
21use crate::crypto::handshake::ntor::{NtorSecretKey, NtorServer};
22use crate::memquota::SpecificAccount as _;
23use crate::memquota::{ChannelAccount, CircuitAccount};
24use crate::relay::channel_provider::ChannelProvider;
25use crate::relay::reactor::Reactor;
26use crate::relay::{IncomingStreamRequestFilter, RelayCirc};
27use smallvec::SmallVec;
28use std::sync::{Arc, RwLock, Weak};
29use tor_cell::chancell::ChanMsg as _;
30use tor_cell::chancell::CircId;
31use tor_cell::chancell::msg::{
32 CreateFast, Created2, CreatedFast, Destroy, DestroyReason, HandshakeType,
33};
34use tor_error::{Bug, ErrorKind, HasKind, debug_report, internal, into_internal};
35use tor_linkspec::OwnedChanTarget;
36use tor_llcrypto::cipher::aes::Aes128Ctr;
37use tor_llcrypto::d::Sha1;
38use tor_llcrypto::pk::ed25519::Ed25519Identity;
39use tor_llcrypto::pk::rsa::RsaIdentity;
40use tor_memquota::mq_queue::ChannelSpec as _;
41use tor_memquota::mq_queue::MpscSpec;
42use tor_relay_crypto::pk::{RelayNtorKeypair, RelayNtorKeys};
43use tor_rtcompat::SpawnExt as _;
44use tor_rtcompat::{DynTimeProvider, Runtime};
45use tracing::warn;
46
47#[derive(derive_more::Debug)]
49pub struct CreateRequestHandler {
50 chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
52 circ_net_params: RwLock<CircNetParameters>,
54 #[debug(skip)]
56 ntor_keys: RwLock<RelayNtorKeys>,
57 #[debug(skip)]
67 incoming_filter_factory: Box<dyn IncomingStreamRequestFilterFactory + Send + Sync>,
68}
69
70impl CreateRequestHandler {
71 pub fn new(
73 chan_provider: Weak<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
74 circ_net_params: CircNetParameters,
75 ntor_keys: RelayNtorKeys,
76 incoming_filter_factory: Box<dyn IncomingStreamRequestFilterFactory + Send + Sync>,
77 ) -> Self {
78 Self {
79 chan_provider,
80 circ_net_params: RwLock::new(circ_net_params),
81 ntor_keys: RwLock::new(ntor_keys),
82 incoming_filter_factory,
83 }
84 }
85
86 pub fn update_params(&self, circ_net_params: CircNetParameters) {
88 *self.circ_net_params.write().expect("rwlock poisoned") = circ_net_params;
89 }
90
91 pub fn update_ntor_keys(&self, ntor_keys: RelayNtorKeys) {
95 *self.ntor_keys.write().expect("rwlock poisoned") = ntor_keys;
96 }
97
98 #[allow(clippy::too_many_arguments)]
106 pub(crate) fn handle_create<R: Runtime>(
107 &self,
108 runtime: &R,
109 channel: &Arc<Channel>,
110 our_ed25519_id: &Ed25519Identity,
111 our_rsa_id: &RsaIdentity,
112 circ_id: CircId,
113 msg: &CreateRequest,
114 memquota: &ChannelAccount,
115 circ_unique_id: UniqId,
116 ) -> Result<(CreateResponse, RelayCircComponents), Destroy> {
117 let result = self.handle_create_inner(
118 runtime,
119 channel,
120 our_ed25519_id,
121 our_rsa_id,
122 circ_id,
123 msg,
124 memquota,
125 circ_unique_id,
126 );
127
128 match result {
129 Ok(x) => Ok(x),
130 Err(e) => {
131 let cmd = msg.cmd();
133 debug_report!(&e, %cmd, "Failed to handle circuit create request");
134
135 Err(Destroy::new(DestroyReason::NONE))
139 }
140 }
141 }
142
143 #[allow(clippy::too_many_arguments)]
145 fn handle_create_inner<R: Runtime>(
146 &self,
147 runtime: &R,
148 channel: &Arc<Channel>,
149 our_ed25519_id: &Ed25519Identity,
150 our_rsa_id: &RsaIdentity,
151 circ_id: CircId,
152 msg: &CreateRequest,
153 memquota: &ChannelAccount,
154 circ_unique_id: UniqId,
155 ) -> Result<(CreateResponse, RelayCircComponents), HandleCreateError> {
156 let handshake_components = match msg {
158 CreateRequest::CreateFast(msg) => self.handle_create_fast(msg)?,
159 CreateRequest::Create2(msg) => match msg.handshake_type() {
160 HandshakeType::NTOR_V3 => self.handle_create2_ntorv3(msg.body(), our_ed25519_id)?,
161 HandshakeType::NTOR => self.handle_create2_ntor(msg.body(), our_rsa_id)?,
162 x @ HandshakeType::TAP | x => {
163 return Err(HandleCreateError::Create2HandshakeType(x));
164 }
165 },
166 };
167
168 let memquota = CircuitAccount::new(memquota)?;
169
170 let time_provider = DynTimeProvider::new(runtime.clone());
176 let account = memquota.as_raw_account();
177 let (sender, receiver) =
178 MpscSpec::new(10_000_000).new_mq(time_provider.clone(), account)?;
179 let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver);
180
181 let (padding_ctrl, padding_stream) =
183 crate::client::circuit::padding::new_padding(DynTimeProvider::new(runtime.clone()));
184
185 let Some(chan_provider) = self.chan_provider.upgrade() else {
187 return Err(internal!("Unable to upgrade weak `ChannelProvider`").into());
188 };
189
190 let incoming_filter = self.incoming_filter_factory.current_filter();
197
198 let (reactor, circ, _incoming_streams) = Reactor::new(
200 runtime.clone(),
201 channel,
202 circ_id,
203 circ_unique_id,
204 receiver,
205 handshake_components.crypto_in,
206 handshake_components.crypto_out,
207 &handshake_components.hop_settings,
208 chan_provider,
209 padding_ctrl.clone(),
210 padding_stream,
211 incoming_filter,
212 &memquota,
213 )
214 .map_err(into_internal!("Failed to start circuit reactor"))?;
215
216 let () = runtime.spawn(async {
220 match reactor.run().await {
221 Ok(()) => {}
222 Err(e) => {
223 debug_report!(e, "Relay circuit reactor exited with an error");
224 }
225 }
226 })?;
227
228 Ok((
229 handshake_components.response,
230 RelayCircComponents {
231 circ,
232 sender,
233 padding_ctrl,
234 },
235 ))
236 }
237
238 fn handle_create_fast(
240 &self,
241 msg: &CreateFast,
242 ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
243 let (keygen, handshake_msg) = CreateFastServer::server(
245 &mut rand::rng(),
246 &mut |_: &()| Some(()),
249 &[()],
251 msg.handshake(),
252 )?;
253
254 let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
255 .map_err(into_internal!("Circuit crypt state construction failed"))?;
256
257 let circ_params = self
258 .circ_net_params
259 .read()
260 .expect("rwlock poisoned")
261 .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
263
264 let protos = tor_protover::Protocols::default();
267 let hop_settings =
268 HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
269 .map_err(into_internal!("Unable to build `HopSettings`"))?;
270
271 let response = CreatedFast::new(handshake_msg);
272 let response = CreateResponse::CreatedFast(response);
273
274 let (crypto_out, crypto_in, _binding) = split_relay_layer(crypt);
275
276 Ok(CompletedHandshakeComponents {
277 response,
278 hop_settings,
279 crypto_out,
280 crypto_in,
281 })
282 }
283
284 fn handle_create2_ntor(
286 &self,
287 msg_body: &[u8],
288 our_rsa_id: &RsaIdentity,
289 ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
290 let ntor_keys = self.ntor_keys(|k| {
291 NtorSecretKey::new(k.secret().clone(), *k.public().inner(), *our_rsa_id)
292 });
293
294 let (keygen, handshake_msg) = NtorServer::server(
296 &mut rand::rng(),
297 &mut |_: &()| Some(()),
300 ntor_keys.as_ref(),
301 msg_body,
302 )?;
303
304 let crypt = tor1::CryptStatePair::<Aes128Ctr, Sha1>::construct(keygen)
305 .map_err(into_internal!("Circuit crypt state construction failed"))?;
306
307 let (crypto_out, crypto_in, _binding) = split_relay_layer(crypt);
308
309 let circ_params = self
310 .circ_net_params
311 .read()
312 .expect("rwlock poisoned")
313 .as_circ_parameters(AlgorithmDiscriminants::FixedWindow)?;
315
316 let protos = tor_protover::Protocols::default();
319 let hop_settings =
320 HopSettings::from_params_and_caps(HopNegotiationType::None, &circ_params, &protos)
321 .map_err(into_internal!("Unable to build `HopSettings`"))?;
322
323 let response = Created2::new(handshake_msg);
324 let response = CreateResponse::Created2(response);
325
326 Ok(CompletedHandshakeComponents {
327 response,
328 hop_settings,
329 crypto_out,
330 crypto_in,
331 })
332 }
333
334 fn handle_create2_ntorv3(
336 &self,
337 _msg_body: &[u8],
338 _our_ed25519_id: &Ed25519Identity,
339 ) -> Result<CompletedHandshakeComponents, HandleCreateError> {
340 Err(HandleCreateError::Create2HandshakeType(
341 HandshakeType::NTOR_V3,
342 ))
343 }
344
345 fn ntor_keys<T>(&self, map: impl FnMut(&RelayNtorKeypair) -> T) -> impl AsRef<[T]> {
353 let ntor_keys = self.ntor_keys.read().expect("rwlock poisoned");
354 let ntor_keys = [Some(ntor_keys.latest()), ntor_keys.previous()];
355 ntor_keys
356 .into_iter()
357 .flatten()
358 .map(map)
359 .collect::<SmallVec<[T; 2]>>()
360 }
361}
362
363fn split_relay_layer<F, B>(
365 crypt: impl RelayLayer<F, B>,
366) -> (
367 Box<dyn OutboundRelayLayer + Send>,
368 Box<dyn InboundRelayLayer + Send>,
369 CircuitBinding,
370)
371where
372 F: OutboundRelayLayer + Send + 'static,
373 B: InboundRelayLayer + Send + 'static,
374{
375 let (crypto_out, crypto_in, binding) = crypt.split_relay_layer();
376 let (crypto_out, crypto_in) = (Box::new(crypto_out), Box::new(crypto_in));
377
378 (crypto_out, crypto_in, binding)
379}
380
381#[derive(Debug, thiserror::Error)]
383enum HandleCreateError {
384 #[error("Circuit relay handshake failed")]
386 Handshake(#[from] RelayHandshakeError),
387 #[error("Unsupported handshake type {0}")]
389 Create2HandshakeType(HandshakeType),
390 #[error("Memquota error")]
392 Memquota(#[from] tor_memquota::Error),
393 #[error("Runtime task spawn error")]
395 Spawn(#[from] futures::task::SpawnError),
396 #[error("Internal error")]
401 Internal(#[from] tor_error::Bug),
402}
403
404impl HasKind for HandleCreateError {
405 fn kind(&self) -> ErrorKind {
406 match self {
407 Self::Handshake(e) => e.kind(),
408 Self::Create2HandshakeType(_) => ErrorKind::NotImplemented,
409 Self::Memquota(e) => e.kind(),
410 Self::Spawn(e) => e.kind(),
411 Self::Internal(_) => ErrorKind::Internal,
412 }
413 }
414}
415
416struct CompletedHandshakeComponents {
418 response: CreateResponse,
420 hop_settings: HopSettings,
422 crypto_out: Box<dyn OutboundRelayLayer + Send>,
424 crypto_in: Box<dyn InboundRelayLayer + Send>,
426}
427
428pub(crate) struct RelayCircComponents {
430 pub(crate) circ: Arc<RelayCirc>,
432 pub(crate) sender: CircuitRxSender,
434 pub(crate) padding_ctrl: PaddingController,
436}
437
438#[derive(Debug, Clone)]
440#[allow(clippy::exhaustive_structs)]
441pub struct CongestionControlNetParams {
442 pub fixed_window: FixedWindowParams,
444
445 pub vegas_exit: VegasParams,
450
451 pub cwnd: CongestionWindowParams,
453
454 pub rtt: RoundTripEstimatorParams,
456
457 pub flow_ctrl: FlowCtrlParameters,
459}
460
461impl CongestionControlNetParams {
462 #[cfg(test)]
463 pub(crate) fn defaults_for_tests() -> Self {
465 Self {
466 fixed_window: FixedWindowParams::defaults_for_tests(),
467 vegas_exit: VegasParams::defaults_for_tests(),
468 cwnd: CongestionWindowParams::defaults_for_tests(),
469 rtt: RoundTripEstimatorParams::defaults_for_tests(),
470 flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
471 }
472 }
473}
474
475#[derive(Debug, Clone)]
483#[allow(clippy::exhaustive_structs)]
484pub struct CircNetParameters {
485 pub extend_by_ed25519_id: bool,
487
488 pub cc: CongestionControlNetParams,
490}
491
492impl CircNetParameters {
493 #[warn(unused)]
501 fn as_circ_parameters(&self, algorithm: AlgorithmDiscriminants) -> Result<CircParameters, Bug> {
502 let Self {
505 extend_by_ed25519_id,
506 cc:
507 CongestionControlNetParams {
508 fixed_window,
509 vegas_exit,
510 cwnd,
511 rtt,
512 flow_ctrl,
513 },
514 } = self;
515
516 let algorithm = match algorithm {
517 AlgorithmDiscriminants::FixedWindow => Algorithm::FixedWindow(*fixed_window),
518 AlgorithmDiscriminants::Vegas => Algorithm::Vegas(*vegas_exit),
519 };
520
521 let cc = CongestionControlParams::builder()
523 .alg(algorithm)
524 .fixed_window_params(*fixed_window)
525 .cwnd_params(*cwnd)
526 .rtt_params(rtt.clone())
527 .build()
528 .map_err(into_internal!("Could not build `CongestionControlParams`"))?;
529
530 Ok(CircParameters::new(
531 *extend_by_ed25519_id,
532 cc,
533 flow_ctrl.clone(),
534 ))
535 }
536}
537
538pub trait IncomingStreamRequestFilterFactory {
544 fn current_filter(&self) -> Box<dyn IncomingStreamRequestFilter>;
547}
548
549impl<F> IncomingStreamRequestFilterFactory for F
550where
551 F: Fn() -> Box<dyn IncomingStreamRequestFilter>,
552{
553 fn current_filter(&self) -> Box<dyn IncomingStreamRequestFilter> {
554 (self)()
555 }
556}