1pub(crate) mod halfcirc;
39
40#[cfg(feature = "hs-common")]
41pub mod handshake;
42#[cfg(not(feature = "hs-common"))]
43pub(crate) mod handshake;
44
45pub(crate) mod padding;
46
47pub(super) mod path;
48
49use crate::channel::Channel;
50use crate::circuit::circhop::{HopNegotiationType, HopSettings};
51use crate::circuit::{CircuitRxReceiver, celltypes::*};
52#[cfg(feature = "circ-padding-manual")]
53use crate::client::CircuitPadder;
54use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
55use crate::client::reactor::{CircuitHandshake, CtrlCmd, CtrlMsg, Reactor};
56use crate::crypto::cell::HopNum;
57use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
58use crate::memquota::CircuitAccount;
59use crate::util::skew::ClockSkew;
60use crate::{Error, Result};
61use derive_deftly::Deftly;
62use educe::Educe;
63use path::HopDetail;
64use tor_cell::chancell::{
65 CircId,
66 msg::{self as chanmsg},
67};
68use tor_error::{bad_api_usage, internal, into_internal};
69use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
70use tor_protover::named;
71use tor_rtcompat::DynTimeProvider;
72use web_time_compat::Instant;
73
74use crate::circuit::UniqId;
75
76use super::{ClientTunnel, TargetHop};
77
78use futures::channel::mpsc;
79use oneshot_fused_workaround as oneshot;
80
81use futures::FutureExt as _;
82use std::collections::HashMap;
83use std::sync::{Arc, Mutex};
84use tor_memquota::derive_deftly_template_HasMemoryCost;
85
86use crate::crypto::handshake::ntor::NtorPublicKey;
87
88pub use crate::crypto::binding::CircuitBinding;
89pub use path::{Path, PathEntry};
90
91pub use crate::circuit::CircParameters;
93
94pub use crate::util::timeout::TimeoutEstimator;
96
97#[derive(Debug, Deftly)]
100#[allow(unreachable_pub)] #[derive_deftly(HasMemoryCost)]
102#[derive_deftly(RestrictedChanMsgSet)]
103#[deftly(usage = "on an open client circuit")]
104pub(super) enum ClientCircChanMsg {
105 Relay(chanmsg::Relay),
108 Destroy(chanmsg::Destroy),
110 }
112
113#[derive(Debug)]
114pub struct ClientCirc {
157 pub(super) mutable: Arc<TunnelMutableState>,
159 unique_id: UniqId,
161 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
163 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
165 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
168 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
169 #[cfg(test)]
171 circid: CircId,
172 pub(super) memquota: CircuitAccount,
174 pub(super) time_provider: DynTimeProvider,
176 pub(super) is_multi_path: bool,
186}
187
188#[derive(Debug, Default)]
208pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
209
210impl TunnelMutableState {
211 pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
213 #[allow(unused)] let state = self
215 .0
216 .lock()
217 .expect("lock poisoned")
218 .insert(unique_id, mutable);
219
220 debug_assert!(state.is_none());
221 }
222
223 pub(super) fn remove(&self, unique_id: UniqId) {
225 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
228 debug_assert!(state.is_some());
229 }
230
231 fn all_paths(&self) -> Vec<Arc<Path>> {
233 let lock = self.0.lock().expect("lock poisoned");
234 lock.values().map(|mutable| mutable.path()).collect()
235 }
236
237 #[cfg(feature = "rpc")]
243 pub(super) fn tagged_paths(&self) -> HashMap<UniqId, Arc<Path>> {
244 let lock = self.0.lock().expect("lock poisoned");
245 lock.iter()
246 .map(|(id, mutable)| (*id, mutable.path()))
247 .collect()
248 }
249
250 #[allow(unstable_name_collisions)]
258 fn single_path(&self) -> Result<Arc<Path>> {
259 use itertools::Itertools as _;
260
261 self.all_paths().into_iter().exactly_one().map_err(|_| {
262 bad_api_usage!("requested the single path of a multi-path tunnel?!").into()
263 })
264 }
265
266 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
271 let lock = self.0.lock().expect("lock poisoned");
272 let mutable = lock
273 .get(&unique_id)
274 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
275
276 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
277 path::HopDetail::Relay(r) => r,
278 #[cfg(feature = "hs-common")]
279 path::HopDetail::Virtual => {
280 panic!("somehow made a circuit with a virtual first hop.")
281 }
282 });
283
284 Ok(first_hop)
285 }
286
287 pub(super) fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
293 let lock = self.0.lock().expect("lock poisoned");
294 let mutable = lock
295 .get(&unique_id)
296 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297
298 Ok(mutable.last_hop_num())
299 }
300
301 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
305 let lock = self.0.lock().expect("lock poisoned");
306 let mutable = lock
307 .get(&unique_id)
308 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
309
310 Ok(mutable.n_hops())
311 }
312}
313
314#[derive(Educe, Default)]
316#[educe(Debug)]
317pub(super) struct MutableState(Mutex<CircuitState>);
318
319impl MutableState {
320 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
322 let mut mutable = self.0.lock().expect("poisoned lock");
323 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
324 mutable.binding.push(binding);
325 }
326
327 pub(super) fn path(&self) -> Arc<path::Path> {
329 let mutable = self.0.lock().expect("poisoned lock");
330 Arc::clone(&mutable.path)
331 }
332
333 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
336 let mutable = self.0.lock().expect("poisoned lock");
337
338 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
339 }
342
343 fn first_hop(&self) -> Option<HopDetail> {
345 let mutable = self.0.lock().expect("poisoned lock");
346 mutable.path.first_hop()
347 }
348
349 fn last_hop_num(&self) -> Option<HopNum> {
356 let mutable = self.0.lock().expect("poisoned lock");
357 mutable.path.last_hop_num()
358 }
359
360 fn n_hops(&self) -> usize {
367 let mutable = self.0.lock().expect("poisoned lock");
368 mutable.path.n_hops()
369 }
370}
371
372#[derive(Educe, Default)]
374#[educe(Debug)]
375pub(super) struct CircuitState {
376 path: Arc<path::Path>,
382
383 #[educe(Debug(ignore))]
391 binding: Vec<Option<CircuitBinding>>,
392}
393
394pub struct PendingClientTunnel {
399 recvcreated: oneshot::Receiver<CreateResponse>,
402 circ: ClientCirc,
404}
405
406impl ClientCirc {
407 pub fn into_tunnel(self) -> Result<ClientTunnel> {
409 self.try_into()
410 }
411
412 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
420 Ok(self
421 .mutable
422 .first_hop(self.unique_id)
423 .map_err(|_| Error::CircuitClosed)?
424 .expect("called first_hop on an un-constructed circuit"))
425 }
426
427 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
437 let all_paths = self.all_paths();
438 let path = all_paths.first().ok_or_else(|| {
439 tor_error::bad_api_usage!("Called last_hop_info on an un-constructed tunnel")
440 })?;
441 Ok(path
442 .hops()
443 .last()
444 .expect("Called last_hop on an un-constructed circuit")
445 .as_chan_target()
446 .map(OwnedChanTarget::from_chan_target))
447 }
448
449 pub fn last_hop_num(&self) -> Result<HopNum> {
459 Ok(self
460 .mutable
461 .last_hop_num(self.unique_id)?
462 .ok_or_else(|| internal!("no last hop index"))?)
463 }
464
465 pub fn last_hop(&self) -> Result<TargetHop> {
470 let hop_num = self
471 .mutable
472 .last_hop_num(self.unique_id)?
473 .ok_or_else(|| bad_api_usage!("no last hop"))?;
474 Ok((self.unique_id, hop_num).into())
475 }
476
477 pub fn all_paths(&self) -> Vec<Arc<Path>> {
482 self.mutable.all_paths()
483 }
484
485 pub fn single_path(&self) -> Result<Arc<Path>> {
489 self.mutable.single_path()
490 }
491
492 pub async fn disused_since(&self) -> Result<Option<Instant>> {
501 let (tx, rx) = oneshot::channel();
502 self.command
503 .unbounded_send(CtrlCmd::GetTunnelActivity { sender: tx })
504 .map_err(|_| Error::CircuitClosed)?;
505
506 Ok(rx.await.map_err(|_| Error::CircuitClosed)?.disused_since())
507 }
508
509 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
513 let (tx, rx) = oneshot::channel();
514
515 self.control
516 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
517 .map_err(|_| Error::CircuitClosed)?;
518
519 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
520 }
521
522 pub fn mq_account(&self) -> &CircuitAccount {
524 &self.memquota
525 }
526
527 #[cfg(feature = "hs-service")]
535 pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
536 let (sender, receiver) = oneshot::channel();
537 let msg = CtrlCmd::GetBindingKey { hop, done: sender };
538 self.command
539 .unbounded_send(msg)
540 .map_err(|_| Error::CircuitClosed)?;
541
542 receiver.await.map_err(|_| Error::CircuitClosed)?
543 }
544
545 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
548 where
549 Tg: CircTarget,
550 {
551 #![allow(deprecated)]
552
553 if target
565 .protovers()
566 .supports_named_subver(named::RELAY_NTORV3)
567 {
568 self.extend_ntor_v3(target, params).await
569 } else {
570 self.extend_ntor(target, params).await
571 }
572 }
573
574 #[deprecated(since = "1.6.1", note = "Use extend instead.")]
577 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
578 where
579 Tg: CircTarget,
580 {
581 let key = NtorPublicKey {
582 id: *target
583 .rsa_identity()
584 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
585 pk: *target.ntor_onion_key(),
586 };
587 let mut linkspecs = target
588 .linkspecs()
589 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
590 if !params.extend_by_ed25519_id {
591 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
592 }
593
594 let (tx, rx) = oneshot::channel();
595
596 let peer_id = OwnedChanTarget::from_chan_target(target);
597 let settings = HopSettings::from_params_and_caps(
598 HopNegotiationType::None,
599 ¶ms,
600 target.protovers(),
601 )?;
602 self.control
603 .unbounded_send(CtrlMsg::ExtendNtor {
604 peer_id,
605 public_key: key,
606 linkspecs,
607 settings,
608 done: tx,
609 })
610 .map_err(|_| Error::CircuitClosed)?;
611
612 rx.await.map_err(|_| Error::CircuitClosed)??;
613
614 Ok(())
615 }
616
617 #[deprecated(since = "1.6.1", note = "Use extend instead.")]
620 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
621 where
622 Tg: CircTarget,
623 {
624 let key = NtorV3PublicKey {
625 id: *target
626 .ed_identity()
627 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
628 pk: *target.ntor_onion_key(),
629 };
630 let mut linkspecs = target
631 .linkspecs()
632 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
633 if !params.extend_by_ed25519_id {
634 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
635 }
636
637 let (tx, rx) = oneshot::channel();
638
639 let peer_id = OwnedChanTarget::from_chan_target(target);
640 let settings = HopSettings::from_params_and_caps(
641 HopNegotiationType::Full,
642 ¶ms,
643 target.protovers(),
644 )?;
645 self.control
646 .unbounded_send(CtrlMsg::ExtendNtorV3 {
647 peer_id,
648 public_key: key,
649 linkspecs,
650 settings,
651 done: tx,
652 })
653 .map_err(|_| Error::CircuitClosed)?;
654
655 rx.await.map_err(|_| Error::CircuitClosed)??;
656
657 Ok(())
658 }
659
660 #[cfg(feature = "hs-common")]
684 pub async fn extend_virtual(
685 &self,
686 protocol: handshake::RelayProtocol,
687 role: handshake::HandshakeRole,
688 seed: impl handshake::KeyGenerator,
689 params: &CircParameters,
690 capabilities: &tor_protover::Protocols,
691 ) -> Result<()> {
692 use self::handshake::BoxedClientLayer;
693
694 let negotiation_type = match protocol {
696 handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
697 };
698 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
699
700 let BoxedClientLayer { fwd, back, binding } =
701 protocol.construct_client_layers(role, seed)?;
702
703 let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
704 let (tx, rx) = oneshot::channel();
705 let message = CtrlCmd::ExtendVirtual {
706 cell_crypto: (fwd, back, binding),
707 settings,
708 done: tx,
709 };
710
711 self.command
712 .unbounded_send(message)
713 .map_err(|_| Error::CircuitClosed)?;
714
715 rx.await.map_err(|_| Error::CircuitClosed)?
716 }
717
718 #[cfg(feature = "circ-padding-manual")]
722 pub async fn start_padding_at_hop(&self, hop: HopNum, padder: CircuitPadder) -> Result<()> {
723 self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), Some(padder))
724 .await
725 }
726
727 #[cfg(feature = "circ-padding-manual")]
731 pub async fn stop_padding_at_hop(&self, hop: HopNum) -> Result<()> {
732 self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), None)
733 .await
734 }
735
736 #[cfg(feature = "circ-padding-manual")]
738 pub(super) async fn set_padder_impl(
739 &self,
740 hop: crate::HopLocation,
741 padder: Option<CircuitPadder>,
742 ) -> Result<()> {
743 let (tx, rx) = oneshot::channel();
744 let msg = CtrlCmd::SetPadder {
745 hop,
746 padder,
747 sender: tx,
748 };
749 self.command
750 .unbounded_send(msg)
751 .map_err(|_| Error::CircuitClosed)?;
752 rx.await.map_err(|_| Error::CircuitClosed)?
753 }
754
755 pub fn is_closing(&self) -> bool {
757 self.control.is_closed()
758 }
759
760 pub fn unique_id(&self) -> UniqId {
762 self.unique_id
763 }
764
765 pub fn n_hops(&self) -> Result<usize> {
772 self.mutable
773 .n_hops(self.unique_id)
774 .map_err(|_| Error::CircuitClosed)
775 }
776
777 pub fn wait_for_close(
784 &self,
785 ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
786 self.reactor_closed_rx.clone().map(|_| ())
787 }
788}
789
790impl PendingClientTunnel {
791 #[allow(clippy::too_many_arguments)]
795 pub(crate) fn new(
796 id: CircId,
797 channel: Arc<Channel>,
798 createdreceiver: oneshot::Receiver<CreateResponse>,
799 input: CircuitRxReceiver,
800 unique_id: UniqId,
801 runtime: DynTimeProvider,
802 memquota: CircuitAccount,
803 padding_ctrl: PaddingController,
804 padding_stream: PaddingEventStream,
805 timeouts: Arc<dyn TimeoutEstimator>,
806 ) -> (PendingClientTunnel, crate::client::reactor::Reactor) {
807 let time_provider = channel.time_provider().clone();
808 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) = Reactor::new(
809 channel,
810 id,
811 unique_id,
812 input,
813 runtime,
814 memquota.clone(),
815 padding_ctrl,
816 padding_stream,
817 timeouts,
818 );
819
820 let circuit = ClientCirc {
821 mutable,
822 unique_id,
823 control: control_tx,
824 command: command_tx,
825 reactor_closed_rx: reactor_closed_rx.shared(),
826 #[cfg(test)]
827 circid: id,
828 memquota,
829 time_provider,
830 is_multi_path: false,
831 };
832
833 let pending = PendingClientTunnel {
834 recvcreated: createdreceiver,
835 circ: circuit,
836 };
837 (pending, reactor)
838 }
839
840 pub fn peek_unique_id(&self) -> UniqId {
842 self.circ.unique_id
843 }
844
845 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<ClientTunnel> {
852 let protocols = tor_protover::Protocols::new();
857 let settings =
858 HopSettings::from_params_and_caps(HopNegotiationType::None, ¶ms, &protocols)?;
859 let (tx, rx) = oneshot::channel();
860 self.circ
861 .control
862 .unbounded_send(CtrlMsg::Create {
863 recv_created: self.recvcreated,
864 handshake: CircuitHandshake::CreateFast,
865 settings,
866 done: tx,
867 })
868 .map_err(|_| Error::CircuitClosed)?;
869
870 rx.await.map_err(|_| Error::CircuitClosed)??;
871
872 self.circ.into_tunnel()
873 }
874
875 pub async fn create_firsthop<Tg>(
880 self,
881 target: &Tg,
882 params: CircParameters,
883 ) -> Result<ClientTunnel>
884 where
885 Tg: tor_linkspec::CircTarget,
886 {
887 #![allow(deprecated)]
888 if target
890 .protovers()
891 .supports_named_subver(named::RELAY_NTORV3)
892 {
893 self.create_firsthop_ntor_v3(target, params).await
894 } else {
895 self.create_firsthop_ntor(target, params).await
896 }
897 }
898
899 #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
904 pub async fn create_firsthop_ntor<Tg>(
905 self,
906 target: &Tg,
907 params: CircParameters,
908 ) -> Result<ClientTunnel>
909 where
910 Tg: tor_linkspec::CircTarget,
911 {
912 let (tx, rx) = oneshot::channel();
913 let settings = HopSettings::from_params_and_caps(
914 HopNegotiationType::None,
915 ¶ms,
916 target.protovers(),
917 )?;
918
919 self.circ
920 .control
921 .unbounded_send(CtrlMsg::Create {
922 recv_created: self.recvcreated,
923 handshake: CircuitHandshake::Ntor {
924 public_key: NtorPublicKey {
925 id: *target
926 .rsa_identity()
927 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
928 pk: *target.ntor_onion_key(),
929 },
930 ed_identity: *target
931 .ed_identity()
932 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
933 },
934 settings,
935 done: tx,
936 })
937 .map_err(|_| Error::CircuitClosed)?;
938
939 rx.await.map_err(|_| Error::CircuitClosed)??;
940
941 self.circ.into_tunnel()
942 }
943
944 #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
953 pub async fn create_firsthop_ntor_v3<Tg>(
954 self,
955 target: &Tg,
956 params: CircParameters,
957 ) -> Result<ClientTunnel>
958 where
959 Tg: tor_linkspec::CircTarget,
960 {
961 let settings = HopSettings::from_params_and_caps(
962 HopNegotiationType::Full,
963 ¶ms,
964 target.protovers(),
965 )?;
966 let (tx, rx) = oneshot::channel();
967
968 self.circ
969 .control
970 .unbounded_send(CtrlMsg::Create {
971 recv_created: self.recvcreated,
972 handshake: CircuitHandshake::NtorV3 {
973 public_key: NtorV3PublicKey {
974 id: *target
975 .ed_identity()
976 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
977 pk: *target.ntor_onion_key(),
978 },
979 },
980 settings,
981 done: tx,
982 })
983 .map_err(|_| Error::CircuitClosed)?;
984
985 rx.await.map_err(|_| Error::CircuitClosed)??;
986
987 self.circ.into_tunnel()
988 }
989}
990
991#[cfg(test)]
992pub(crate) mod test {
993 #![allow(clippy::bool_assert_comparison)]
995 #![allow(clippy::clone_on_copy)]
996 #![allow(clippy::dbg_macro)]
997 #![allow(clippy::mixed_attributes_style)]
998 #![allow(clippy::print_stderr)]
999 #![allow(clippy::print_stdout)]
1000 #![allow(clippy::single_char_pattern)]
1001 #![allow(clippy::unwrap_used)]
1002 #![allow(clippy::unchecked_time_subtraction)]
1003 #![allow(clippy::useless_vec)]
1004 #![allow(clippy::needless_pass_by_value)]
1005 #![allow(clippy::string_slice)] use super::*;
1009 use crate::channel::test::{CodecResult, new_reactor};
1010 use crate::circuit::CircuitRxSender;
1011 use crate::circuit::reactor::test::rmsg_to_ccmsg;
1012 use crate::circuit::test::fake_mpsc;
1013 use crate::client::circuit::padding::new_padding;
1014 use crate::client::stream::DataStream;
1015 use crate::congestion::params::CongestionControlParams;
1016 use crate::congestion::test_utils::params::build_cc_vegas_params;
1017 use crate::crypto::cell::RelayCellBody;
1018 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1019 use crate::memquota::SpecificAccount as _;
1020 use crate::stream::flow_ctrl::params::FlowCtrlParameters;
1021 use crate::util::DummyTimeoutEstimator;
1022 use assert_matches::assert_matches;
1023 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1024 use futures::channel::mpsc::{Receiver, Sender};
1025 use futures::io::{AsyncReadExt, AsyncWriteExt};
1026 use futures::sink::SinkExt;
1027 use futures::stream::StreamExt;
1028 use hex_literal::hex;
1029 use std::collections::{HashMap, VecDeque};
1030 use std::fmt::Debug;
1031 use std::time::Duration;
1032 use tor_basic_utils::test_rng::testing_rng;
1033 use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCell, ChanCmd, msg as chanmsg};
1034 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1035 use tor_cell::relaycell::msg::SendmeTag;
1036 use tor_cell::relaycell::{
1037 AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg, msg::AnyRelayMsg,
1038 };
1039 use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
1040 use tor_linkspec::OwnedCircTarget;
1041 use tor_rtcompat::Runtime;
1042 use tor_rtcompat::SpawnExt;
1043 use tracing::trace;
1044 use tracing_test::traced_test;
1045
1046 #[cfg(feature = "conflux")]
1047 use {
1048 crate::client::reactor::ConfluxHandshakeResult,
1049 crate::util::err::ConfluxHandshakeError,
1050 futures::future::FusedFuture,
1051 futures::lock::Mutex as AsyncMutex,
1052 std::pin::Pin,
1053 std::result::Result as StdResult,
1054 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1055 tor_cell::relaycell::msg::ConfluxLink,
1056 tor_rtmock::MockRuntime,
1057 };
1058
1059 #[cfg(feature = "hs-service")]
1060 use crate::circuit::reactor::test::AllowAllStreamsFilter;
1061
1062 impl PendingClientTunnel {
1063 pub(crate) fn peek_circid(&self) -> CircId {
1065 self.circ.circid
1066 }
1067 }
1068
1069 impl ClientCirc {
1070 pub(crate) fn peek_circid(&self) -> CircId {
1072 self.circid
1073 }
1074 }
1075
1076 impl ClientTunnel {
1077 pub(crate) async fn resolve_last_hop(&self) -> TargetHop {
1078 let (sender, receiver) = oneshot::channel();
1079 let _ =
1080 self.as_single_circ()
1081 .unwrap()
1082 .command
1083 .unbounded_send(CtrlCmd::ResolveTargetHop {
1084 hop: TargetHop::LastHop,
1085 done: sender,
1086 });
1087 TargetHop::Hop(receiver.await.unwrap().unwrap())
1088 }
1089 }
1090
1091 const EXAMPLE_SK: [u8; 32] =
1093 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1094 const EXAMPLE_PK: [u8; 32] =
1095 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1096 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1097 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1098
1099 fn example_target() -> OwnedCircTarget {
1101 let mut builder = OwnedCircTarget::builder();
1102 builder
1103 .chan_target()
1104 .ed_identity(EXAMPLE_ED_ID.into())
1105 .rsa_identity(EXAMPLE_RSA_ID.into());
1106 builder
1107 .ntor_onion_key(EXAMPLE_PK.into())
1108 .protocols("FlowCtrl=1-2".parse().unwrap())
1109 .build()
1110 .unwrap()
1111 }
1112 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1113 crate::crypto::handshake::ntor::NtorSecretKey::new(
1114 EXAMPLE_SK.into(),
1115 EXAMPLE_PK.into(),
1116 EXAMPLE_RSA_ID.into(),
1117 )
1118 }
1119 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1120 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1121 EXAMPLE_SK.into(),
1122 EXAMPLE_PK.into(),
1123 EXAMPLE_ED_ID.into(),
1124 )
1125 }
1126
1127 fn working_fake_channel<R: Runtime>(
1128 rt: &R,
1129 ) -> (Arc<Channel>, Receiver<AnyChanCell>, Sender<CodecResult>) {
1130 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1131 rt.spawn(async {
1132 let _ignore = chan_reactor.run().await;
1133 })
1134 .unwrap();
1135 (channel, rx, tx)
1136 }
1137
1138 #[derive(Copy, Clone)]
1140 enum HandshakeType {
1141 Fast,
1142 Ntor,
1143 NtorV3,
1144 }
1145
1146 #[allow(deprecated)]
1147 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1148 use crate::crypto::handshake::{ServerHandshake, fast::CreateFastServer, ntor::NtorServer};
1152
1153 let (chan, mut rx, _sink) = working_fake_channel(rt);
1154 let circid = CircId::new(128).unwrap();
1155 let (created_send, created_recv) = oneshot::channel();
1156 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1157 let unique_id = UniqId::new(23, 17);
1158 let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1159
1160 let (pending, reactor) = PendingClientTunnel::new(
1161 circid,
1162 chan,
1163 created_recv,
1164 circmsg_recv,
1165 unique_id,
1166 DynTimeProvider::new(rt.clone()),
1167 CircuitAccount::new_noop(),
1168 padding_ctrl,
1169 padding_stream,
1170 Arc::new(DummyTimeoutEstimator),
1171 );
1172
1173 rt.spawn(async {
1174 let _ignore = reactor.run().await;
1175 })
1176 .unwrap();
1177
1178 let simulate_relay_fut = async move {
1180 let mut rng = testing_rng();
1181 let create_cell = rx.next().await.unwrap();
1182 assert_eq!(create_cell.circid(), Some(circid));
1183 let reply = match handshake_type {
1184 HandshakeType::Fast => {
1185 let cf = match create_cell.msg() {
1186 AnyChanMsg::CreateFast(cf) => cf,
1187 other => panic!("{:?}", other),
1188 };
1189 let (_, rep) = CreateFastServer::server(
1190 &mut rng,
1191 &mut |_: &()| Some(()),
1192 &[()],
1193 cf.handshake(),
1194 )
1195 .unwrap();
1196 CreateResponse::CreatedFast(CreatedFast::new(rep))
1197 }
1198 HandshakeType::Ntor => {
1199 let c2 = match create_cell.msg() {
1200 AnyChanMsg::Create2(c2) => c2,
1201 other => panic!("{:?}", other),
1202 };
1203 let (_, rep) = NtorServer::server(
1204 &mut rng,
1205 &mut |_: &()| Some(()),
1206 &[example_ntor_key()],
1207 c2.body(),
1208 )
1209 .unwrap();
1210 CreateResponse::Created2(Created2::new(rep))
1211 }
1212 HandshakeType::NtorV3 => {
1213 let c2 = match create_cell.msg() {
1214 AnyChanMsg::Create2(c2) => c2,
1215 other => panic!("{:?}", other),
1216 };
1217 let mut reply_fn = if with_cc {
1218 |client_exts: &[CircRequestExt]| {
1219 let _ = client_exts
1220 .iter()
1221 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1222 .expect("Client failed to request CC");
1223 Some(vec![CircResponseExt::CcResponse(
1226 extend_ext::CcResponse::new(31),
1227 )])
1228 }
1229 } else {
1230 |_: &_| Some(vec![])
1231 };
1232 let (_, rep) = NtorV3Server::server(
1233 &mut rng,
1234 &mut reply_fn,
1235 &[example_ntor_v3_key()],
1236 c2.body(),
1237 )
1238 .unwrap();
1239 CreateResponse::Created2(Created2::new(rep))
1240 }
1241 };
1242 created_send.send(reply).unwrap();
1243 };
1244 let client_fut = async move {
1246 let target = example_target();
1247 let params = CircParameters::default();
1248 let ret = match handshake_type {
1249 HandshakeType::Fast => {
1250 trace!("doing fast create");
1251 pending.create_firsthop_fast(params).await
1252 }
1253 HandshakeType::Ntor => {
1254 trace!("doing ntor create");
1255 pending.create_firsthop_ntor(&target, params).await
1256 }
1257 HandshakeType::NtorV3 => {
1258 let params = if with_cc {
1259 CircParameters::new(
1261 true,
1262 build_cc_vegas_params(),
1263 FlowCtrlParameters::defaults_for_tests(),
1264 )
1265 } else {
1266 params
1267 };
1268 trace!("doing ntor_v3 create");
1269 pending.create_firsthop_ntor_v3(&target, params).await
1270 }
1271 };
1272 trace!("create done: result {:?}", ret);
1273 ret
1274 };
1275
1276 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1277
1278 let _circ = circ.unwrap();
1279
1280 assert_eq!(_circ.n_hops().unwrap(), 1);
1282 }
1283
1284 #[traced_test]
1285 #[test]
1286 fn test_create_fast() {
1287 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1288 test_create(&rt, HandshakeType::Fast, false).await;
1289 });
1290 }
1291 #[traced_test]
1292 #[test]
1293 fn test_create_ntor() {
1294 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1295 test_create(&rt, HandshakeType::Ntor, false).await;
1296 });
1297 }
1298 #[traced_test]
1299 #[test]
1300 fn test_create_ntor_v3() {
1301 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1302 test_create(&rt, HandshakeType::NtorV3, false).await;
1303 });
1304 }
1305 #[traced_test]
1306 #[test]
1307 #[cfg(feature = "flowctl-cc")]
1308 fn test_create_ntor_v3_with_cc() {
1309 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1310 test_create(&rt, HandshakeType::NtorV3, true).await;
1311 });
1312 }
1313
1314 pub(crate) struct DummyCrypto {
1317 counter_tag: [u8; 20],
1318 counter: u32,
1319 lasthop: bool,
1320 }
1321 impl DummyCrypto {
1322 fn next_tag(&mut self) -> SendmeTag {
1323 #![allow(clippy::identity_op)]
1324 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1325 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1326 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1327 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1328 self.counter += 1;
1329 self.counter_tag.into()
1330 }
1331 }
1332
1333 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1334 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1335 self.next_tag()
1336 }
1337 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1338 }
1339 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1340 fn decrypt_inbound(
1341 &mut self,
1342 _cmd: ChanCmd,
1343 _cell: &mut RelayCellBody,
1344 ) -> Option<SendmeTag> {
1345 if self.lasthop {
1346 Some(self.next_tag())
1347 } else {
1348 None
1349 }
1350 }
1351 }
1352 impl DummyCrypto {
1353 pub(crate) fn new(lasthop: bool) -> Self {
1354 DummyCrypto {
1355 counter_tag: [0; 20],
1356 counter: 0,
1357 lasthop,
1358 }
1359 }
1360 }
1361
1362 async fn newtunnel_ext<R: Runtime>(
1365 rt: &R,
1366 unique_id: UniqId,
1367 chan: Arc<Channel>,
1368 hops: Vec<path::HopDetail>,
1369 next_msg_from: HopNum,
1370 params: CircParameters,
1371 ) -> (ClientTunnel, CircuitRxSender) {
1372 let circid = CircId::new(128).unwrap();
1373 let (_created_send, created_recv) = oneshot::channel();
1374 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1375 let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1376
1377 let (pending, reactor) = PendingClientTunnel::new(
1378 circid,
1379 chan,
1380 created_recv,
1381 circmsg_recv,
1382 unique_id,
1383 DynTimeProvider::new(rt.clone()),
1384 CircuitAccount::new_noop(),
1385 padding_ctrl,
1386 padding_stream,
1387 Arc::new(DummyTimeoutEstimator),
1388 );
1389
1390 rt.spawn(async {
1391 let _ignore = reactor.run().await;
1392 })
1393 .unwrap();
1394 let PendingClientTunnel {
1395 circ,
1396 recvcreated: _,
1397 } = pending;
1398
1399 let relay_cell_format = RelayCellFormat::V0;
1401
1402 let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1403 for (idx, peer_id) in hops.into_iter().enumerate() {
1404 let (tx, rx) = oneshot::channel();
1405 let idx = idx as u8;
1406
1407 circ.command
1408 .unbounded_send(CtrlCmd::AddFakeHop {
1409 relay_cell_format,
1410 fwd_lasthop: idx == last_hop_num,
1411 rev_lasthop: idx == u8::from(next_msg_from),
1412 peer_id,
1413 params: params.clone(),
1414 done: tx,
1415 })
1416 .unwrap();
1417 rx.await.unwrap().unwrap();
1418 }
1419 (circ.into_tunnel().unwrap(), circmsg_send)
1420 }
1421
1422 async fn newtunnel<R: Runtime>(
1425 rt: &R,
1426 chan: Arc<Channel>,
1427 ) -> (Arc<ClientTunnel>, CircuitRxSender) {
1428 let hops = std::iter::repeat_with(|| {
1429 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1430 .ed_identity([4; 32].into())
1431 .rsa_identity([5; 20].into())
1432 .build()
1433 .expect("Could not construct fake hop");
1434
1435 path::HopDetail::Relay(peer_id)
1436 })
1437 .take(3)
1438 .collect();
1439
1440 let unique_id = UniqId::new(23, 17);
1441 let (tunnel, circmsg_send) = newtunnel_ext(
1442 rt,
1443 unique_id,
1444 chan,
1445 hops,
1446 2.into(),
1447 CircParameters::default(),
1448 )
1449 .await;
1450
1451 (Arc::new(tunnel), circmsg_send)
1452 }
1453
1454 fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1457 (0..n)
1458 .map(|idx| {
1459 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1460 .ed_identity([idx + start_idx; 32].into())
1461 .rsa_identity([idx + start_idx + 1; 20].into())
1462 .build()
1463 .expect("Could not construct fake hop");
1464
1465 path::HopDetail::Relay(peer_id)
1466 })
1467 .collect()
1468 }
1469
1470 #[allow(deprecated)]
1471 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1472 use crate::crypto::handshake::{ServerHandshake, ntor::NtorServer};
1473
1474 let (chan, mut rx, _sink) = working_fake_channel(rt);
1475 let (tunnel, mut sink) = newtunnel(rt, chan).await;
1476 let circ = Arc::new(tunnel.as_single_circ().unwrap());
1477 let circid = circ.peek_circid();
1478 let params = CircParameters::default();
1479
1480 let extend_fut = async move {
1481 let target = example_target();
1482 match handshake_type {
1483 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1484 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1485 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1486 };
1487 circ };
1489 let reply_fut = async move {
1490 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1493 assert_eq!(id, Some(circid));
1494 let rmsg = match chmsg {
1495 AnyChanMsg::RelayEarly(r) => {
1496 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1497 .unwrap()
1498 }
1499 other => panic!("{:?}", other),
1500 };
1501 let e2 = match rmsg.msg() {
1502 AnyRelayMsg::Extend2(e2) => e2,
1503 other => panic!("{:?}", other),
1504 };
1505 let mut rng = testing_rng();
1506 let reply = match handshake_type {
1507 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1508 HandshakeType::Ntor => {
1509 let (_keygen, reply) = NtorServer::server(
1510 &mut rng,
1511 &mut |_: &()| Some(()),
1512 &[example_ntor_key()],
1513 e2.handshake(),
1514 )
1515 .unwrap();
1516 reply
1517 }
1518 HandshakeType::NtorV3 => {
1519 let (_keygen, reply) = NtorV3Server::server(
1520 &mut rng,
1521 &mut |_: &[CircRequestExt]| Some(vec![]),
1522 &[example_ntor_v3_key()],
1523 e2.handshake(),
1524 )
1525 .unwrap();
1526 reply
1527 }
1528 };
1529
1530 let extended2 = relaymsg::Extended2::new(reply).into();
1531 sink.send(rmsg_to_ccmsg(None, extended2, false))
1532 .await
1533 .unwrap();
1534 (sink, rx) };
1536
1537 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1538
1539 assert_eq!(circ.n_hops().unwrap(), 4);
1541
1542 {
1544 let path = circ.single_path().unwrap();
1545 let path = path
1546 .all_hops()
1547 .filter_map(|hop| match hop {
1548 path::HopDetail::Relay(r) => Some(r),
1549 #[cfg(feature = "hs-common")]
1550 path::HopDetail::Virtual => None,
1551 })
1552 .collect::<Vec<_>>();
1553
1554 assert_eq!(path.len(), 4);
1555 use tor_linkspec::HasRelayIds;
1556 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1557 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1558 }
1559 {
1560 let path = circ.single_path().unwrap();
1561 assert_eq!(path.n_hops(), 4);
1562 use tor_linkspec::HasRelayIds;
1563 assert_eq!(
1564 path.hops()[3].as_chan_target().unwrap().ed_identity(),
1565 example_target().ed_identity()
1566 );
1567 assert_ne!(
1568 path.hops()[0].as_chan_target().unwrap().ed_identity(),
1569 example_target().ed_identity()
1570 );
1571 }
1572 }
1573
1574 #[traced_test]
1575 #[test]
1576 fn test_extend_ntor() {
1577 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1578 test_extend(&rt, HandshakeType::Ntor).await;
1579 });
1580 }
1581
1582 #[traced_test]
1583 #[test]
1584 fn test_extend_ntor_v3() {
1585 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1586 test_extend(&rt, HandshakeType::NtorV3).await;
1587 });
1588 }
1589
1590 #[allow(deprecated)]
1591 async fn bad_extend_test_impl<R: Runtime>(
1592 rt: &R,
1593 reply_hop: HopNum,
1594 bad_reply: AnyChanMsg,
1595 ) -> Error {
1596 let (chan, mut rx, _sink) = working_fake_channel(rt);
1597 let hops = std::iter::repeat_with(|| {
1598 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1599 .ed_identity([4; 32].into())
1600 .rsa_identity([5; 20].into())
1601 .build()
1602 .expect("Could not construct fake hop");
1603
1604 path::HopDetail::Relay(peer_id)
1605 })
1606 .take(3)
1607 .collect();
1608
1609 let unique_id = UniqId::new(23, 17);
1610 let (tunnel, mut sink) = newtunnel_ext(
1611 rt,
1612 unique_id,
1613 chan,
1614 hops,
1615 reply_hop,
1616 CircParameters::default(),
1617 )
1618 .await;
1619 let params = CircParameters::default();
1620
1621 let target = example_target();
1622 let reply_task_handle = rt
1623 .spawn_with_handle(async move {
1624 let (_circid, chanmsg) = rx.next().await.unwrap().into_circid_and_msg();
1626 let AnyChanMsg::RelayEarly(relay_early) = chanmsg else {
1627 panic!("unexpected message {chanmsg:?}");
1628 };
1629 let relaymsg = UnparsedRelayMsg::from_singleton_body(
1630 RelayCellFormat::V0,
1631 relay_early.into_relay_body(),
1632 )
1633 .unwrap();
1634 assert_eq!(relaymsg.cmd(), RelayCmd::EXTEND2);
1635
1636 sink.send(bad_reply).await.unwrap();
1638 sink
1639 })
1640 .unwrap();
1641 let outcome = tunnel
1642 .as_single_circ()
1643 .unwrap()
1644 .extend_ntor(&target, params)
1645 .await;
1646 let _sink = reply_task_handle.await;
1647
1648 assert_eq!(tunnel.n_hops().unwrap(), 3);
1649 assert!(outcome.is_err());
1650 outcome.unwrap_err()
1651 }
1652
1653 #[traced_test]
1654 #[test]
1655 fn bad_extend_wronghop() {
1656 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1657 let extended2 = relaymsg::Extended2::new(vec![]).into();
1658 let cc = rmsg_to_ccmsg(None, extended2, false);
1659
1660 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1661 match error {
1666 Error::CircuitClosed => {}
1667 x => panic!("got other error: {}", x),
1668 }
1669 });
1670 }
1671
1672 #[traced_test]
1673 #[test]
1674 fn bad_extend_wrongtype() {
1675 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1676 let extended = relaymsg::Extended::new(vec![7; 200]).into();
1677 let cc = rmsg_to_ccmsg(None, extended, false);
1678
1679 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1680 match error {
1681 Error::BytesErr {
1682 err: tor_bytes::Error::InvalidMessage(_),
1683 object: "extended2 message",
1684 } => {}
1685 other => panic!("{:?}", other),
1686 }
1687 });
1688 }
1689
1690 #[traced_test]
1691 #[test]
1692 fn bad_extend_destroy() {
1693 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1694 let cc = AnyChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1695 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1696 match error {
1697 Error::CircuitClosed => {}
1698 other => panic!("{:?}", other),
1699 }
1700 });
1701 }
1702
1703 #[traced_test]
1704 #[test]
1705 fn bad_extend_crypto() {
1706 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1707 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1708 let cc = rmsg_to_ccmsg(None, extended2, false);
1709 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1710 assert_matches!(error, Error::BadCircHandshakeAuth);
1711 });
1712 }
1713
1714 #[traced_test]
1715 #[test]
1716 fn begindir() {
1717 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1718 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1719 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1720 let circ = tunnel.as_single_circ().unwrap();
1721 let circid = circ.peek_circid();
1722
1723 let begin_and_send_fut = async move {
1724 let mut stream = tunnel.begin_dir_stream().await.unwrap();
1727 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1728 stream.flush().await.unwrap();
1729 let mut buf = [0_u8; 1024];
1730 let n = stream.read(&mut buf).await.unwrap();
1731 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1732 let n = stream.read(&mut buf).await.unwrap();
1733 assert_eq!(n, 0);
1734 stream
1735 };
1736 let reply_fut = async move {
1737 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1740 assert_eq!(id, Some(circid));
1741 let rmsg = match chmsg {
1742 AnyChanMsg::Relay(r) => {
1743 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1744 .unwrap()
1745 }
1746 other => panic!("{:?}", other),
1747 };
1748 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1749 assert_matches!(rmsg, AnyRelayMsg::BeginDir(_));
1750
1751 let connected = relaymsg::Connected::new_empty().into();
1753 sink.send(rmsg_to_ccmsg(streamid, connected, false))
1754 .await
1755 .unwrap();
1756
1757 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1759 assert_eq!(id, Some(circid));
1760 let rmsg = match chmsg {
1761 AnyChanMsg::Relay(r) => {
1762 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1763 .unwrap()
1764 }
1765 other => panic!("{:?}", other),
1766 };
1767 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1768 assert_eq!(streamid_2, streamid);
1769 if let AnyRelayMsg::Data(d) = rmsg {
1770 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1771 } else {
1772 panic!();
1773 }
1774
1775 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1777 .unwrap()
1778 .into();
1779 sink.send(rmsg_to_ccmsg(streamid, data, false))
1780 .await
1781 .unwrap();
1782
1783 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1785 sink.send(rmsg_to_ccmsg(streamid, end, false))
1786 .await
1787 .unwrap();
1788
1789 (rx, sink) };
1791
1792 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1793 });
1794 }
1795
1796 fn close_stream_helper(by_drop: bool) {
1798 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1799 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1800 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1801
1802 let stream_fut = async move {
1803 let stream = tunnel
1804 .begin_stream("www.example.com", 80, None)
1805 .await
1806 .unwrap();
1807
1808 let (r, mut w) = stream.split();
1809 if by_drop {
1810 drop(r);
1812 drop(w);
1813 (None, tunnel) } else {
1815 w.close().await.unwrap();
1817 (Some(r), tunnel)
1818 }
1819 };
1820 let handler_fut = async {
1821 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1823 let rmsg = match msg {
1824 AnyChanMsg::Relay(r) => {
1825 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1826 .unwrap()
1827 }
1828 other => panic!("{:?}", other),
1829 };
1830 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1831 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1832
1833 let connected =
1835 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1836 sink.send(rmsg_to_ccmsg(streamid, connected, false))
1837 .await
1838 .unwrap();
1839
1840 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1842 let rmsg = match msg {
1843 AnyChanMsg::Relay(r) => {
1844 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1845 .unwrap()
1846 }
1847 other => panic!("{:?}", other),
1848 };
1849 let (_, rmsg) = rmsg.into_streamid_and_msg();
1850 assert_eq!(rmsg.cmd(), RelayCmd::END);
1851
1852 (rx, sink) };
1854
1855 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1856 });
1857 }
1858
1859 #[traced_test]
1860 #[test]
1861 fn drop_stream() {
1862 close_stream_helper(true);
1863 }
1864
1865 #[traced_test]
1866 #[test]
1867 fn close_stream() {
1868 close_stream_helper(false);
1869 }
1870
1871 #[traced_test]
1872 #[test]
1873 fn expire_halfstreams() {
1874 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1875 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1876 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1877
1878 let client_fut = async move {
1879 let stream = tunnel
1880 .begin_stream("www.example.com", 80, None)
1881 .await
1882 .unwrap();
1883
1884 let (r, mut w) = stream.split();
1885 w.close().await.unwrap();
1887 (Some(r), tunnel)
1888 };
1889 let exit_fut = async {
1890 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1892 let rmsg = match msg {
1893 AnyChanMsg::Relay(r) => {
1894 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1895 .unwrap()
1896 }
1897 other => panic!("{:?}", other),
1898 };
1899 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1900 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1901
1902 let connected =
1904 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1905 sink.send(rmsg_to_ccmsg(streamid, connected, false))
1906 .await
1907 .unwrap();
1908
1909 (rx, streamid, sink) };
1911
1912 let ((_opt_reader, tunnel), (_rx, streamid, mut sink)) =
1913 futures::join!(client_fut, exit_fut);
1914
1915 rt.progress_until_stalled().await;
1918
1919 assert!(!tunnel.is_closed());
1921
1922 let data = relaymsg::Data::new(b"hello").unwrap();
1925 sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1926 .await
1927 .unwrap();
1928 rt.progress_until_stalled().await;
1929
1930 assert!(!tunnel.is_closed());
1932
1933 let stream_timeout = DummyTimeoutEstimator.circuit_build_timeout(3);
1938 rt.advance_by(2 * stream_timeout).await;
1939
1940 let data = relaymsg::Data::new(b"hello").unwrap();
1943 sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1944 .await
1945 .unwrap();
1946 rt.progress_until_stalled().await;
1947
1948 assert!(tunnel.is_closed());
1950 });
1951 }
1952
1953 async fn setup_incoming_sendme_case<R: Runtime>(
1955 rt: &R,
1956 n_to_send: usize,
1957 ) -> (
1958 Arc<ClientTunnel>,
1959 DataStream,
1960 CircuitRxSender,
1961 Option<StreamId>,
1962 usize,
1963 Receiver<AnyChanCell>,
1964 Sender<CodecResult>,
1965 ) {
1966 let (chan, mut rx, sink2) = working_fake_channel(rt);
1967 let (tunnel, mut sink) = newtunnel(rt, chan).await;
1968 let circid = tunnel.as_single_circ().unwrap().peek_circid();
1969
1970 let begin_and_send_fut = {
1971 let tunnel = tunnel.clone();
1972 async move {
1973 let mut stream = tunnel
1975 .begin_stream("www.example.com", 443, None)
1976 .await
1977 .unwrap();
1978 let junk = [0_u8; 1024];
1979 let mut remaining = n_to_send;
1980 while remaining > 0 {
1981 let n = std::cmp::min(remaining, junk.len());
1982 stream.write_all(&junk[..n]).await.unwrap();
1983 remaining -= n;
1984 }
1985 stream.flush().await.unwrap();
1986 stream
1987 }
1988 };
1989
1990 let receive_fut = async move {
1991 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1993 let rmsg = match chmsg {
1994 AnyChanMsg::Relay(r) => {
1995 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1996 .unwrap()
1997 }
1998 other => panic!("{:?}", other),
1999 };
2000 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2001 assert_matches!(rmsg, AnyRelayMsg::Begin(_));
2002 let connected = relaymsg::Connected::new_empty().into();
2004 sink.send(rmsg_to_ccmsg(streamid, connected, false))
2005 .await
2006 .unwrap();
2007 let mut bytes_received = 0_usize;
2009 let mut cells_received = 0_usize;
2010 while bytes_received < n_to_send {
2011 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2013 assert_eq!(id, Some(circid));
2014
2015 let rmsg = match chmsg {
2016 AnyChanMsg::Relay(r) => {
2017 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2018 .unwrap()
2019 }
2020 other => panic!("{:?}", other),
2021 };
2022 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2023 assert_eq!(streamid2, streamid);
2024 if let AnyRelayMsg::Data(dat) = rmsg {
2025 cells_received += 1;
2026 bytes_received += dat.as_ref().len();
2027 } else {
2028 panic!();
2029 }
2030 }
2031
2032 (sink, streamid, cells_received, rx)
2033 };
2034
2035 let (stream, (sink, streamid, cells_received, rx)) =
2036 futures::join!(begin_and_send_fut, receive_fut);
2037
2038 (tunnel, stream, sink, streamid, cells_received, rx, sink2)
2039 }
2040
2041 #[traced_test]
2042 #[test]
2043 fn accept_valid_sendme() {
2044 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2045 let (tunnel, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2046 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2047 let circ = tunnel.as_single_circ().unwrap();
2048
2049 assert_eq!(cells_received, 301);
2050
2051 {
2053 let (tx, rx) = oneshot::channel();
2054 circ.command
2055 .unbounded_send(CtrlCmd::QuerySendWindow {
2056 hop: 2.into(),
2057 leg: tunnel.unique_id(),
2058 done: tx,
2059 })
2060 .unwrap();
2061 let (window, tags) = rx.await.unwrap().unwrap();
2062 assert_eq!(window, 1000 - 301);
2063 assert_eq!(tags.len(), 3);
2064 assert_eq!(
2066 tags[0],
2067 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2068 );
2069 assert_eq!(
2071 tags[1],
2072 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2073 );
2074 assert_eq!(
2076 tags[2],
2077 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2078 );
2079 }
2080
2081 let reply_with_sendme_fut = async move {
2082 let c_sendme =
2084 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2085 .into();
2086 sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2087 .await
2088 .unwrap();
2089
2090 let s_sendme = relaymsg::Sendme::new_empty().into();
2092 sink.send(rmsg_to_ccmsg(streamid, s_sendme, false))
2093 .await
2094 .unwrap();
2095
2096 sink
2097 };
2098
2099 let _sink = reply_with_sendme_fut.await;
2100
2101 rt.advance_until_stalled().await;
2102
2103 {
2106 let (tx, rx) = oneshot::channel();
2107 circ.command
2108 .unbounded_send(CtrlCmd::QuerySendWindow {
2109 hop: 2.into(),
2110 leg: tunnel.unique_id(),
2111 done: tx,
2112 })
2113 .unwrap();
2114 let (window, _tags) = rx.await.unwrap().unwrap();
2115 assert_eq!(window, 1000 - 201);
2116 }
2117 });
2118 }
2119
2120 #[traced_test]
2121 #[test]
2122 fn invalid_circ_sendme() {
2123 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2124 let (tunnel, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2128 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2129
2130 let reply_with_sendme_fut = async move {
2131 let c_sendme =
2133 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2134 .into();
2135 sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2136 .await
2137 .unwrap();
2138 sink
2139 };
2140
2141 let _sink = reply_with_sendme_fut.await;
2142
2143 rt.advance_until_stalled().await;
2145 assert!(tunnel.is_closed());
2146 });
2147 }
2148
2149 #[traced_test]
2150 #[test]
2151 fn test_busy_stream_fairness() {
2152 const N_STREAMS: usize = 3;
2154 const N_CELLS: usize = 20;
2156 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2159 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2166 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2167
2168 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2169 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2170 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2171
2172 rt.spawn({
2178 let tunnel = tunnel.clone();
2181 async move {
2182 let mut clients = VecDeque::new();
2183 struct Client {
2184 stream: DataStream,
2185 to_write: &'static [u8],
2186 }
2187 for _ in 0..N_STREAMS {
2188 clients.push_back(Client {
2189 stream: tunnel
2190 .begin_stream("www.example.com", 80, None)
2191 .await
2192 .unwrap(),
2193 to_write: &[0_u8; N_BYTES][..],
2194 });
2195 }
2196 while let Some(mut client) = clients.pop_front() {
2197 if client.to_write.is_empty() {
2198 continue;
2200 }
2201 let written = client.stream.write(client.to_write).await.unwrap();
2202 client.to_write = &client.to_write[written..];
2203 clients.push_back(client);
2204 }
2205 }
2206 })
2207 .unwrap();
2208
2209 let channel_handler_fut = async {
2210 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2211 let mut total_bytes_received = 0;
2212
2213 loop {
2214 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2215 let rmsg = match msg {
2216 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2217 RelayCellFormat::V0,
2218 r.into_relay_body(),
2219 )
2220 .unwrap(),
2221 other => panic!("Unexpected chanmsg: {other:?}"),
2222 };
2223 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2224 match rmsg.cmd() {
2225 RelayCmd::BEGIN => {
2226 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2228 assert_eq!(prev, None);
2229 let connected = relaymsg::Connected::new_with_addr(
2231 "10.0.0.1".parse().unwrap(),
2232 1234,
2233 )
2234 .into();
2235 sink.send(rmsg_to_ccmsg(streamid, connected, false))
2236 .await
2237 .unwrap();
2238 }
2239 RelayCmd::DATA => {
2240 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2241 let nbytes = data_msg.as_ref().len();
2242 total_bytes_received += nbytes;
2243 let streamid = streamid.unwrap();
2244 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2245 *stream_bytes += nbytes;
2246 if total_bytes_received >= N_BYTES {
2247 break;
2248 }
2249 }
2250 RelayCmd::END => {
2251 continue;
2256 }
2257 other => {
2258 panic!("Unexpected command {other:?}");
2259 }
2260 }
2261 }
2262
2263 (total_bytes_received, stream_bytes_received, rx, sink)
2266 };
2267
2268 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2269 channel_handler_fut.await;
2270 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2271 for (sid, stream_bytes) in stream_bytes_received {
2272 assert!(
2273 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2274 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2275 );
2276 }
2277 });
2278 }
2279
2280 #[test]
2281 fn basic_params() {
2282 use super::CircParameters;
2283 let mut p = CircParameters::default();
2284 assert!(p.extend_by_ed25519_id);
2285
2286 p.extend_by_ed25519_id = false;
2287 assert!(!p.extend_by_ed25519_id);
2288 }
2289
2290 #[traced_test]
2291 #[test]
2292 #[cfg(feature = "hs-service")]
2293 fn allow_stream_requests_twice() {
2294 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2295 let (chan, _rx, _sink) = working_fake_channel(&rt);
2296 let (tunnel, _send) = newtunnel(&rt, chan).await;
2297
2298 let _incoming = tunnel
2299 .allow_stream_requests(
2300 &[tor_cell::relaycell::RelayCmd::BEGIN],
2301 tunnel.resolve_last_hop().await,
2302 AllowAllStreamsFilter,
2303 )
2304 .await
2305 .unwrap();
2306
2307 let incoming = tunnel
2308 .allow_stream_requests(
2309 &[tor_cell::relaycell::RelayCmd::BEGIN],
2310 tunnel.resolve_last_hop().await,
2311 AllowAllStreamsFilter,
2312 )
2313 .await;
2314
2315 assert!(incoming.is_err());
2317 });
2318 }
2319
2320 #[traced_test]
2321 #[test]
2322 #[cfg(feature = "hs-service")]
2323 fn allow_stream_requests() {
2324 use tor_cell::relaycell::msg::BeginFlags;
2325
2326 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2327 const TEST_DATA: &[u8] = b"ping";
2328
2329 let (chan, _rx, _sink) = working_fake_channel(&rt);
2330 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2331
2332 let rfmt = RelayCellFormat::V0;
2333
2334 let (tx, rx) = oneshot::channel();
2336 let mut incoming = tunnel
2337 .allow_stream_requests(
2338 &[tor_cell::relaycell::RelayCmd::BEGIN],
2339 tunnel.resolve_last_hop().await,
2340 AllowAllStreamsFilter,
2341 )
2342 .await
2343 .unwrap();
2344
2345 let simulate_service = async move {
2346 let stream = incoming.next().await.unwrap();
2347 let mut data_stream = stream
2348 .accept_data(relaymsg::Connected::new_empty())
2349 .await
2350 .unwrap();
2351 tx.send(()).unwrap();
2353
2354 let mut buf = [0_u8; TEST_DATA.len()];
2356 data_stream.read_exact(&mut buf).await.unwrap();
2357 assert_eq!(&buf, TEST_DATA);
2358
2359 tunnel
2360 };
2361
2362 let simulate_client = async move {
2363 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2364 let body: BoxedCellBody =
2365 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2366 .encode(rfmt, &mut testing_rng())
2367 .unwrap();
2368 let begin_msg = chanmsg::Relay::from(body);
2369
2370 send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2372
2373 rx.await.unwrap();
2379 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2381 let body: BoxedCellBody =
2382 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2383 .encode(rfmt, &mut testing_rng())
2384 .unwrap();
2385 let data_msg = chanmsg::Relay::from(body);
2386
2387 send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2388 send
2389 };
2390
2391 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2392 });
2393 }
2394
2395 #[traced_test]
2396 #[test]
2397 #[cfg(feature = "hs-service")]
2398 fn accept_stream_after_reject() {
2399 use tor_cell::relaycell::msg::AnyRelayMsg;
2400 use tor_cell::relaycell::msg::BeginFlags;
2401 use tor_cell::relaycell::msg::EndReason;
2402
2403 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2404 const TEST_DATA: &[u8] = b"ping";
2405 const STREAM_COUNT: usize = 2;
2406 let rfmt = RelayCellFormat::V0;
2407
2408 let (chan, _rx, _sink) = working_fake_channel(&rt);
2409 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2410
2411 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2413
2414 let mut incoming = tunnel
2415 .allow_stream_requests(
2416 &[tor_cell::relaycell::RelayCmd::BEGIN],
2417 tunnel.resolve_last_hop().await,
2418 AllowAllStreamsFilter,
2419 )
2420 .await
2421 .unwrap();
2422
2423 let simulate_service = async move {
2424 for i in 0..STREAM_COUNT {
2426 let stream = incoming.next().await.unwrap();
2427
2428 if i == 0 {
2430 stream
2431 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2432 .await
2433 .unwrap();
2434 tx.send(()).await.unwrap();
2436 continue;
2437 }
2438
2439 let mut data_stream = stream
2440 .accept_data(relaymsg::Connected::new_empty())
2441 .await
2442 .unwrap();
2443 tx.send(()).await.unwrap();
2445
2446 let mut buf = [0_u8; TEST_DATA.len()];
2448 data_stream.read_exact(&mut buf).await.unwrap();
2449 assert_eq!(&buf, TEST_DATA);
2450 }
2451
2452 tunnel
2453 };
2454
2455 let simulate_client = async move {
2456 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2457 let body: BoxedCellBody =
2458 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2459 .encode(rfmt, &mut testing_rng())
2460 .unwrap();
2461 let begin_msg = chanmsg::Relay::from(body);
2462
2463 for _ in 0..STREAM_COUNT {
2466 send.send(AnyChanMsg::Relay(begin_msg.clone()))
2467 .await
2468 .unwrap();
2469
2470 rx.next().await.unwrap();
2472 }
2473
2474 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2476 let body: BoxedCellBody =
2477 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2478 .encode(rfmt, &mut testing_rng())
2479 .unwrap();
2480 let data_msg = chanmsg::Relay::from(body);
2481
2482 send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2483 send
2484 };
2485
2486 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2487 });
2488 }
2489
2490 #[traced_test]
2491 #[test]
2492 #[cfg(feature = "hs-service")]
2493 fn incoming_stream_bad_hop() {
2494 use tor_cell::relaycell::msg::BeginFlags;
2495
2496 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2497 const EXPECTED_HOP: u8 = 1;
2499 let rfmt = RelayCellFormat::V0;
2500
2501 let (chan, _rx, _sink) = working_fake_channel(&rt);
2502 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2503
2504 let mut incoming = tunnel
2506 .allow_stream_requests(
2507 &[tor_cell::relaycell::RelayCmd::BEGIN],
2508 (
2510 tunnel.as_single_circ().unwrap().unique_id(),
2511 EXPECTED_HOP.into(),
2512 )
2513 .into(),
2514 AllowAllStreamsFilter,
2515 )
2516 .await
2517 .unwrap();
2518
2519 let simulate_service = async move {
2520 assert!(incoming.next().await.is_none());
2523 tunnel
2524 };
2525
2526 let simulate_client = async move {
2527 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2528 let body: BoxedCellBody =
2529 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2530 .encode(rfmt, &mut testing_rng())
2531 .unwrap();
2532 let begin_msg = chanmsg::Relay::from(body);
2533
2534 send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2536
2537 send
2538 };
2539
2540 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2541 });
2542 }
2543
2544 #[traced_test]
2545 #[test]
2546 #[cfg(feature = "conflux")]
2547 fn multipath_circ_validation() {
2548 use std::error::Error as _;
2549
2550 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2551 let params = CircParameters::default();
2552 let invalid_tunnels = [
2553 setup_bad_conflux_tunnel(&rt).await,
2554 setup_conflux_tunnel(&rt, true, params).await,
2555 ];
2556
2557 for tunnel in invalid_tunnels {
2558 let TestTunnelCtx {
2559 tunnel: _tunnel,
2560 circs: _circs,
2561 conflux_link_rx,
2562 } = tunnel;
2563
2564 let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2565 let err_src = conflux_hs_err.source().unwrap();
2566
2567 assert!(
2570 err_src
2571 .to_string()
2572 .contains("one more conflux circuits are invalid")
2573 );
2574 }
2575 });
2576 }
2577
2578 #[derive(Debug)]
2582 #[allow(unused)]
2583 #[cfg(feature = "conflux")]
2584 struct TestCircuitCtx {
2585 chan_rx: Receiver<AnyChanCell>,
2586 chan_tx: Sender<std::result::Result<AnyChanCell, Error>>,
2587 circ_tx: CircuitRxSender,
2588 unique_id: UniqId,
2589 }
2590
2591 #[derive(Debug)]
2592 #[cfg(feature = "conflux")]
2593 struct TestTunnelCtx {
2594 tunnel: Arc<ClientTunnel>,
2595 circs: Vec<TestCircuitCtx>,
2596 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2597 }
2598
2599 #[cfg(feature = "conflux")]
2601 async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2602 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2604 let rmsg = match chmsg {
2605 AnyChanMsg::Relay(r) => {
2606 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2607 .unwrap()
2608 }
2609 other => panic!("{:?}", other),
2610 };
2611 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2612
2613 let link = match rmsg {
2614 AnyRelayMsg::ConfluxLink(link) => link,
2615 _ => panic!("unexpected relay message {rmsg:?}"),
2616 };
2617
2618 assert!(streamid.is_none());
2619
2620 link
2621 }
2622
2623 #[cfg(feature = "conflux")]
2624 async fn setup_conflux_tunnel(
2625 rt: &MockRuntime,
2626 same_hops: bool,
2627 params: CircParameters,
2628 ) -> TestTunnelCtx {
2629 let hops1 = hop_details(3, 0);
2630 let hops2 = if same_hops {
2631 hops1.clone()
2632 } else {
2633 hop_details(3, 10)
2634 };
2635
2636 let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2637 let (mut tunnel1, sink1) = newtunnel_ext(
2638 rt,
2639 UniqId::new(1, 3),
2640 chan1,
2641 hops1,
2642 2.into(),
2643 params.clone(),
2644 )
2645 .await;
2646
2647 let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2648
2649 let (tunnel2, sink2) =
2650 newtunnel_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2651
2652 let (answer_tx, answer_rx) = oneshot::channel();
2653 tunnel2
2654 .as_single_circ()
2655 .unwrap()
2656 .command
2657 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2658 .unwrap();
2659
2660 let circuit = answer_rx.await.unwrap().unwrap();
2661 rt.advance_until_stalled().await;
2663 assert!(tunnel2.is_closed());
2664
2665 let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2666 tunnel1
2668 .as_single_circ()
2669 .unwrap()
2670 .control
2671 .unbounded_send(CtrlMsg::LinkCircuits {
2672 circuits: vec![circuit],
2673 answer: conflux_link_tx,
2674 })
2675 .unwrap();
2676
2677 let circ_ctx1 = TestCircuitCtx {
2678 chan_rx: rx1,
2679 chan_tx: chan_sink1,
2680 circ_tx: sink1,
2681 unique_id: tunnel1.unique_id(),
2682 };
2683
2684 let circ_ctx2 = TestCircuitCtx {
2685 chan_rx: rx2,
2686 chan_tx: chan_sink2,
2687 circ_tx: sink2,
2688 unique_id: tunnel2.unique_id(),
2689 };
2690
2691 tunnel1.circ.is_multi_path = true;
2697 TestTunnelCtx {
2698 tunnel: Arc::new(tunnel1),
2699 circs: vec![circ_ctx1, circ_ctx2],
2700 conflux_link_rx,
2701 }
2702 }
2703
2704 #[cfg(feature = "conflux")]
2705 async fn setup_good_conflux_tunnel(
2706 rt: &MockRuntime,
2707 cc_params: CongestionControlParams,
2708 ) -> TestTunnelCtx {
2709 let same_hops = true;
2715 let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2716 let params = CircParameters::new(true, cc_params, flow_ctrl_params);
2717 setup_conflux_tunnel(rt, same_hops, params).await
2718 }
2719
2720 #[cfg(feature = "conflux")]
2721 async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2722 let same_hops = false;
2726 let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2727 let params = CircParameters::new(true, build_cc_vegas_params(), flow_ctrl_params);
2728 setup_conflux_tunnel(rt, same_hops, params).await
2729 }
2730
2731 #[traced_test]
2732 #[test]
2733 #[cfg(feature = "conflux")]
2734 fn reject_conflux_linked_before_hs() {
2735 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2736 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2737 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2738
2739 let nonce = V1Nonce::new(&mut testing_rng());
2740 let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2741 let linked = relaymsg::ConfluxLinked::new(payload).into();
2743 sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
2744
2745 rt.advance_until_stalled().await;
2746 assert!(tunnel.is_closed());
2747 });
2748 }
2749
2750 #[traced_test]
2751 #[test]
2752 #[cfg(feature = "conflux")]
2753 fn conflux_hs_timeout() {
2754 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2755 let TestTunnelCtx {
2756 tunnel: _tunnel,
2757 circs,
2758 conflux_link_rx,
2759 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2760
2761 let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2762
2763 let link = await_link_payload(&mut circ1.chan_rx).await;
2765
2766 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2768 circ1
2769 .circ_tx
2770 .send(rmsg_to_ccmsg(None, linked, false))
2771 .await
2772 .unwrap();
2773
2774 rt.advance_by(Duration::from_secs(60)).await;
2776
2777 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2778
2779 let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
2781 conflux_hs_res.try_into().unwrap();
2782
2783 assert!(res1.is_ok());
2784
2785 let err = res2.unwrap_err();
2786 assert_matches!(err, ConfluxHandshakeError::Timeout);
2787 });
2788 }
2789
2790 #[traced_test]
2791 #[test]
2792 #[cfg(feature = "conflux")]
2793 fn conflux_bad_hs() {
2794 use crate::util::err::ConfluxHandshakeError;
2795
2796 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2797 let nonce = V1Nonce::new(&mut testing_rng());
2798 let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2799 let bad_hs_responses = [
2801 (
2802 rmsg_to_ccmsg(
2803 None,
2804 relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
2805 false,
2806 ),
2807 "Received CONFLUX_LINKED cell with mismatched nonce",
2808 ),
2809 (
2810 rmsg_to_ccmsg(
2811 None,
2812 relaymsg::ConfluxLink::new(bad_link_payload).into(),
2813 false,
2814 ),
2815 "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
2816 ),
2817 (
2818 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2819 "Received CONFLUX_SWITCH on unlinked circuit?!",
2820 ),
2821 ];
2830
2831 for (bad_cell, expected_err) in bad_hs_responses {
2832 let TestTunnelCtx {
2833 tunnel,
2834 circs,
2835 conflux_link_rx,
2836 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2837
2838 let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2839
2840 circ2.circ_tx.send(bad_cell).await.unwrap();
2842
2843 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2844 let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
2848 conflux_hs_res.try_into().unwrap();
2849
2850 match res2.unwrap_err() {
2851 ConfluxHandshakeError::Link(Error::CircProto(e)) => {
2852 assert_eq!(e, expected_err);
2853 }
2854 e => panic!("unexpected error: {e:?}"),
2855 }
2856
2857 assert!(tunnel.is_closed());
2858 }
2859 });
2860 }
2861
2862 #[traced_test]
2863 #[test]
2864 #[cfg(feature = "conflux")]
2865 fn unexpected_conflux_cell() {
2866 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2867 let nonce = V1Nonce::new(&mut testing_rng());
2868 let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2869 let bad_cells = [
2870 rmsg_to_ccmsg(
2871 None,
2872 relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
2873 false,
2874 ),
2875 rmsg_to_ccmsg(
2876 None,
2877 relaymsg::ConfluxLink::new(link_payload.clone()).into(),
2878 false,
2879 ),
2880 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2881 ];
2882
2883 for bad_cell in bad_cells {
2884 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2885 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2886
2887 sink.send(bad_cell).await.unwrap();
2888 rt.advance_until_stalled().await;
2889
2890 assert!(tunnel.is_closed());
2894 }
2895 });
2896 }
2897
2898 #[traced_test]
2899 #[test]
2900 #[cfg(feature = "conflux")]
2901 fn conflux_bad_linked() {
2902 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2903 let TestTunnelCtx {
2904 tunnel,
2905 circs,
2906 conflux_link_rx: _,
2907 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2908
2909 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2910
2911 let link = await_link_payload(&mut circ1.chan_rx).await;
2912
2913 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2915 circ1
2916 .circ_tx
2917 .send(rmsg_to_ccmsg(None, linked, false))
2918 .await
2919 .unwrap();
2920
2921 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2923 circ2
2924 .circ_tx
2925 .send(rmsg_to_ccmsg(None, linked, false))
2926 .await
2927 .unwrap();
2928 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2929 circ2
2930 .circ_tx
2931 .send(rmsg_to_ccmsg(None, linked, false))
2932 .await
2933 .unwrap();
2934
2935 rt.advance_until_stalled().await;
2936
2937 assert!(tunnel.is_closed());
2940 });
2941 }
2942
2943 #[traced_test]
2944 #[test]
2945 #[cfg(feature = "conflux")]
2946 fn conflux_bad_switch() {
2947 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2948 let cc_vegas_params = build_cc_vegas_params();
2949 let cwnd_init = cc_vegas_params.cwnd_params().cwnd_init();
2950 let bad_switch = [
2951 relaymsg::ConfluxSwitch::new(0),
2953 relaymsg::ConfluxSwitch::new(cwnd_init + 1),
2956 ];
2957
2958 for bad_cell in bad_switch {
2959 let TestTunnelCtx {
2960 tunnel,
2961 circs,
2962 conflux_link_rx,
2963 } = setup_good_conflux_tunnel(&rt, cc_vegas_params.clone()).await;
2964
2965 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2966
2967 let link = await_link_payload(&mut circ1.chan_rx).await;
2968
2969 for circ in [&mut circ1, &mut circ2] {
2971 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2972 circ.circ_tx
2973 .send(rmsg_to_ccmsg(None, linked, false))
2974 .await
2975 .unwrap();
2976 }
2977
2978 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2979 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
2980
2981 let msg = rmsg_to_ccmsg(None, bad_cell.clone().into(), false);
2984 circ1.circ_tx.send(msg).await.unwrap();
2985
2986 rt.advance_until_stalled().await;
2988 assert!(tunnel.is_closed());
2989 }
2990 });
2991 }
2992
2993 #[traced_test]
2994 #[test]
2995 #[cfg(feature = "conflux")]
2996 fn conflux_consecutive_switch() {
2997 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2998 let TestTunnelCtx {
2999 tunnel,
3000 circs,
3001 conflux_link_rx,
3002 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3003
3004 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3005
3006 let link = await_link_payload(&mut circ1.chan_rx).await;
3007
3008 for circ in [&mut circ1, &mut circ2] {
3010 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3011 circ.circ_tx
3012 .send(rmsg_to_ccmsg(None, linked, false))
3013 .await
3014 .unwrap();
3015 }
3016
3017 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3018 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3019
3020 let switch1 = relaymsg::ConfluxSwitch::new(10);
3022 let msg = rmsg_to_ccmsg(None, switch1.into(), false);
3023 circ1.circ_tx.send(msg).await.unwrap();
3024
3025 rt.advance_until_stalled().await;
3027 assert!(!tunnel.is_closed());
3028
3029 let switch2 = relaymsg::ConfluxSwitch::new(12);
3031 let msg = rmsg_to_ccmsg(None, switch2.into(), false);
3032 circ1.circ_tx.send(msg).await.unwrap();
3033
3034 rt.advance_until_stalled().await;
3037 assert!(tunnel.is_closed());
3038 });
3039 }
3040
3041 #[traced_test]
3044 #[test]
3045 #[cfg(feature = "conflux")]
3046 fn shutdown_and_return_circ_multipath() {
3047 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3048 let TestTunnelCtx {
3049 tunnel,
3050 circs,
3051 conflux_link_rx: _,
3052 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3053
3054 rt.progress_until_stalled().await;
3055
3056 let (answer_tx, answer_rx) = oneshot::channel();
3057 tunnel
3058 .circ
3059 .command
3060 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3061 .unwrap();
3062
3063 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
3065 let err = answer_rx
3066 .await
3067 .unwrap()
3068 .map(|_| {
3069 ()
3072 })
3073 .unwrap_err();
3074
3075 const MSG: &str = "not a single leg conflux set (got at least 2 elements when exactly one was expected)";
3076 assert!(err.to_string().contains(MSG), "{err}");
3077
3078 rt.progress_until_stalled().await;
3081 assert!(tunnel.is_closed());
3082
3083 drop(circs);
3086 });
3087 }
3088
3089 #[cfg(feature = "conflux")]
3091 #[derive(Debug)]
3092 enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3093 Relay(ConfluxExitState<I>),
3095 Client {
3097 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3099 tunnel: Arc<ClientTunnel>,
3101 send_data: Vec<u8>,
3103 recv_data: Vec<u8>,
3105 },
3106 }
3107
3108 #[allow(unused, clippy::large_enum_variant)]
3111 #[derive(Debug)]
3112 #[cfg(feature = "conflux")]
3113 enum ConfluxEndpointResult {
3114 Circuit {
3115 tunnel: Arc<ClientTunnel>,
3116 stream: DataStream,
3117 },
3118 Relay {
3119 circ: TestCircuitCtx,
3120 },
3121 }
3122
3123 #[derive(Debug)]
3125 #[cfg(feature = "conflux")]
3126 struct ConfluxStreamState {
3127 data_recvd: Vec<u8>,
3129 expected_data_len: usize,
3131 begin_recvd: bool,
3133 end_recvd: bool,
3135 end_sent: bool,
3137 }
3138
3139 #[cfg(feature = "conflux")]
3140 impl ConfluxStreamState {
3141 fn new(expected_data_len: usize) -> Self {
3142 Self {
3143 data_recvd: vec![],
3144 expected_data_len,
3145 begin_recvd: false,
3146 end_recvd: false,
3147 end_sent: false,
3148 }
3149 }
3150 }
3151
3152 #[derive(Debug)]
3155 #[cfg(feature = "conflux")]
3156 struct ExpectedSwitch {
3157 cells_so_far: usize,
3160 seqno: u32,
3162 }
3163
3164 #[cfg(feature = "conflux")]
3170 struct CellDispatcher {
3171 leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3173 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3175 }
3176
3177 #[cfg(feature = "conflux")]
3178 impl CellDispatcher {
3179 async fn run(mut self) {
3180 while !self.cells_to_send.is_empty() {
3181 let (circ_id, cell) = self.cells_to_send.remove(0);
3182 let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3183 let (done_tx, done_rx) = oneshot::channel();
3184 cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3185 let () = done_rx.await.unwrap();
3187 }
3188 }
3189 }
3190
3191 #[cfg(feature = "conflux")]
3193 #[derive(Debug)]
3194 struct CellToSend {
3195 done_tx: oneshot::Sender<()>,
3197 cell: AnyRelayMsg,
3199 }
3200
3201 #[derive(Debug)]
3203 #[cfg(feature = "conflux")]
3204 struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3205 runtime: Arc<AsyncMutex<MockRuntime>>,
3212 tunnel: Arc<ClientTunnel>,
3214 circ: TestCircuitCtx,
3216 rtt_delays: I,
3220 stream_state: Arc<Mutex<ConfluxStreamState>>,
3223 expect_switch: Vec<ExpectedSwitch>,
3226 event_rx: mpsc::Receiver<MockExitEvent>,
3228 event_tx: mpsc::Sender<MockExitEvent>,
3230 is_sending_leg: bool,
3232 cells_rx: mpsc::Receiver<CellToSend>,
3234 }
3235
3236 #[cfg(feature = "conflux")]
3237 async fn good_exit_handshake(
3238 runtime: &Arc<AsyncMutex<MockRuntime>>,
3239 init_rtt_delay: Option<Duration>,
3240 rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3241 sink: &mut CircuitRxSender,
3242 ) {
3243 let link = await_link_payload(rx).await;
3245
3246 if let Some(init_rtt_delay) = init_rtt_delay {
3249 runtime.lock().await.advance_by(init_rtt_delay).await;
3250 }
3251
3252 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3254 sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
3255
3256 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3258 let rmsg = match chmsg {
3259 AnyChanMsg::Relay(r) => {
3260 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3261 .unwrap()
3262 }
3263 other => panic!("{other:?}"),
3264 };
3265 let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3266
3267 assert_matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_));
3268 }
3269
3270 #[derive(Copy, Clone, Debug)]
3272 enum MockExitEvent {
3273 Done,
3275 BeginRecvd(StreamId),
3277 }
3278
3279 #[cfg(feature = "conflux")]
3280 async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3281 state: ConfluxExitState<I>,
3282 ) -> ConfluxEndpointResult {
3283 let ConfluxExitState {
3284 runtime,
3285 tunnel,
3286 mut circ,
3287 rtt_delays,
3288 stream_state,
3289 mut expect_switch,
3290 mut event_tx,
3291 mut event_rx,
3292 is_sending_leg,
3293 mut cells_rx,
3294 } = state;
3295
3296 let mut rtt_delays = rtt_delays.into_iter();
3297
3298 let stream_len = stream_state.lock().unwrap().expected_data_len;
3300 let mut data_cells_received = 0_usize;
3301 let mut cell_count = 0_usize;
3302 let mut tags = vec![];
3303 let mut streamid = None;
3304 let mut done_writing = false;
3305
3306 loop {
3307 let should_exit = {
3308 let stream_state = stream_state.lock().unwrap();
3309 let done_reading = stream_state.data_recvd.len() >= stream_len;
3310
3311 (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3312 };
3313
3314 if should_exit {
3315 break;
3316 }
3317
3318 use futures::select;
3319
3320 let mut next_cell = if streamid.is_some() && !done_writing {
3323 Box::pin(cells_rx.next().fuse())
3324 as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3325 } else {
3326 Box::pin(std::future::pending().fuse())
3327 };
3328
3329 let res = select! {
3332 res = circ.chan_rx.next() => {
3333 res.unwrap()
3334 },
3335 res = event_rx.next() => {
3336 let Some(event) = res else {
3337 break;
3338 };
3339
3340 match event {
3341 MockExitEvent::Done => {
3342 break;
3343 },
3344 MockExitEvent::BeginRecvd(id) => {
3345 streamid = Some(id);
3348 continue;
3349 },
3350 }
3351 }
3352 res = next_cell => {
3353 if let Some(cell_to_send) = res {
3354 let CellToSend { cell, done_tx } = cell_to_send;
3355
3356 let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3358 None
3359 } else {
3360 streamid
3361 };
3362
3363 circ.circ_tx
3364 .send(rmsg_to_ccmsg(streamid, cell, false))
3365 .await
3366 .unwrap();
3367
3368 runtime.lock().await.advance_until_stalled().await;
3369 done_tx.send(()).unwrap();
3370 } else {
3371 done_writing = true;
3372 }
3373
3374 continue;
3375 }
3376 };
3377
3378 let (_id, chmsg) = res.into_circid_and_msg();
3379 cell_count += 1;
3380 let rmsg = match chmsg {
3381 AnyChanMsg::Relay(r) => {
3382 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3383 .unwrap()
3384 }
3385 other => panic!("{:?}", other),
3386 };
3387 let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3388 if streamid.is_none() {
3389 streamid = new_streamid;
3390 }
3391
3392 let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3393 let end_recvd = stream_state.lock().unwrap().end_recvd;
3394 match rmsg {
3395 AnyRelayMsg::Begin(_) if begin_recvd => {
3396 panic!("client tried to open two streams?!");
3397 }
3398 AnyRelayMsg::Begin(_) if !begin_recvd => {
3399 stream_state.lock().unwrap().begin_recvd = true;
3400 let connected = relaymsg::Connected::new_empty().into();
3402 circ.circ_tx
3403 .send(rmsg_to_ccmsg(streamid, connected, false))
3404 .await
3405 .unwrap();
3406 event_tx
3408 .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3409 .await
3410 .unwrap();
3411 }
3412 AnyRelayMsg::End(_) if !end_recvd => {
3413 stream_state.lock().unwrap().end_recvd = true;
3414 break;
3415 }
3416 AnyRelayMsg::End(_) if end_recvd => {
3417 panic!("received two END cells for the same stream?!");
3418 }
3419 AnyRelayMsg::ConfluxSwitch(cell) => {
3420 let expected = expect_switch.remove(0);
3422
3423 assert_eq!(expected.cells_so_far, cell_count);
3424 assert_eq!(expected.seqno, cell.seqno());
3425
3426 continue;
3432 }
3433 AnyRelayMsg::Data(dat) => {
3434 data_cells_received += 1;
3435 stream_state
3436 .lock()
3437 .unwrap()
3438 .data_recvd
3439 .extend_from_slice(dat.as_ref());
3440
3441 let is_next_cell_sendme = data_cells_received.is_multiple_of(31);
3442 if is_next_cell_sendme {
3443 if tags.is_empty() {
3444 runtime.lock().await.advance_until_stalled().await;
3449 let (tx, rx) = oneshot::channel();
3450 tunnel
3451 .circ
3452 .command
3453 .unbounded_send(CtrlCmd::QuerySendWindow {
3454 hop: 2.into(),
3455 leg: circ.unique_id,
3456 done: tx,
3457 })
3458 .unwrap();
3459
3460 let (_window, new_tags) = rx.await.unwrap().unwrap();
3462 tags = new_tags;
3463 }
3464
3465 let tag = tags.remove(0);
3466
3467 if let Some(rtt_delay) = rtt_delays.next().flatten() {
3470 runtime.lock().await.advance_by(rtt_delay).await;
3471 }
3472 let sendme = relaymsg::Sendme::from(tag).into();
3474
3475 circ.circ_tx
3476 .send(rmsg_to_ccmsg(None, sendme, false))
3477 .await
3478 .unwrap();
3479 }
3480 }
3481 _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3482 }
3483 }
3484
3485 let end_recvd = stream_state.lock().unwrap().end_recvd;
3486
3487 if is_sending_leg && !end_recvd {
3489 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3490 circ.circ_tx
3491 .send(rmsg_to_ccmsg(streamid, end, false))
3492 .await
3493 .unwrap();
3494 stream_state.lock().unwrap().end_sent = true;
3495 }
3496
3497 let _ = event_tx.send(MockExitEvent::Done).await;
3499
3500 assert!(
3502 expect_switch.is_empty(),
3503 "expect_switch = {expect_switch:?}"
3504 );
3505
3506 ConfluxEndpointResult::Relay { circ }
3507 }
3508
3509 #[cfg(feature = "conflux")]
3510 async fn run_conflux_client(
3511 tunnel: Arc<ClientTunnel>,
3512 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3513 send_data: Vec<u8>,
3514 recv_data: Vec<u8>,
3515 ) -> ConfluxEndpointResult {
3516 let res = conflux_link_rx.await;
3517
3518 let res = res.unwrap().unwrap();
3519 assert_eq!(res.len(), 2);
3520
3521 let mut stream = tunnel
3526 .begin_stream("www.example.com", 443, None)
3527 .await
3528 .unwrap();
3529
3530 stream.write_all(&send_data).await.unwrap();
3531 stream.flush().await.unwrap();
3532
3533 let mut recv: Vec<u8> = Vec::new();
3534 let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3535 assert_eq!(recv_len, recv_data.len());
3536 assert_eq!(recv_data, recv);
3537
3538 ConfluxEndpointResult::Circuit { tunnel, stream }
3539 }
3540
3541 #[cfg(feature = "conflux")]
3542 async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3543 endpoint: ConfluxTestEndpoint<I>,
3544 ) -> ConfluxEndpointResult {
3545 match endpoint {
3546 ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3547 ConfluxTestEndpoint::Client {
3548 tunnel,
3549 conflux_link_rx,
3550 send_data,
3551 recv_data,
3552 } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3553 }
3554 }
3555
3556 #[traced_test]
3574 #[test]
3575 #[cfg(feature = "conflux")]
3576 fn multipath_client_to_exit() {
3577 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3578 const NUM_CELLS: usize = 300;
3580 const CELL_SIZE: usize = 498;
3582
3583 let TestTunnelCtx {
3584 tunnel,
3585 circs,
3586 conflux_link_rx,
3587 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3588 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3589
3590 let mut send_data = (0..255_u8)
3592 .cycle()
3593 .take(NUM_CELLS * CELL_SIZE)
3594 .collect::<Vec<_>>();
3595 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3596
3597 let mut tasks = vec![];
3598
3599 let (tx1, rx1) = mpsc::channel(1);
3602 let (tx2, rx2) = mpsc::channel(1);
3603
3604 let circ1_rtt_delays = [
3609 Some(Duration::from_millis(100)),
3611 Some(Duration::from_millis(500)),
3615 Some(Duration::from_millis(700)),
3616 Some(Duration::from_millis(900)),
3617 Some(Duration::from_millis(1100)),
3618 Some(Duration::from_millis(1300)),
3619 Some(Duration::from_millis(1500)),
3620 Some(Duration::from_millis(1700)),
3621 Some(Duration::from_millis(1900)),
3622 Some(Duration::from_millis(2100)),
3623 ]
3624 .into_iter();
3625
3626 let circ2_rtt_delays = [
3627 Some(Duration::from_millis(200)),
3628 Some(Duration::from_millis(400)),
3629 Some(Duration::from_millis(600)),
3630 Some(Duration::from_millis(800)),
3631 Some(Duration::from_millis(1000)),
3632 Some(Duration::from_millis(1200)),
3633 Some(Duration::from_millis(1400)),
3634 Some(Duration::from_millis(1600)),
3635 Some(Duration::from_millis(1800)),
3636 Some(Duration::from_millis(2000)),
3637 ]
3638 .into_iter();
3639
3640 let expected_switches1 = vec![ExpectedSwitch {
3641 cells_so_far: 126,
3649 seqno: 124,
3658 }];
3659
3660 let expected_switches2 = vec![ExpectedSwitch {
3661 cells_so_far: 1,
3664 seqno: 125,
3666 }];
3667
3668 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3669
3670 let (_, cells_rx1) = mpsc::channel(1);
3673 let (_, cells_rx2) = mpsc::channel(1);
3674
3675 let relay1 = ConfluxExitState {
3676 runtime: Arc::clone(&relay_runtime),
3677 tunnel: Arc::clone(&tunnel),
3678 circ: circ1,
3679 rtt_delays: circ1_rtt_delays,
3680 stream_state: Arc::clone(&stream_state),
3681 expect_switch: expected_switches1,
3682 event_tx: tx1,
3683 event_rx: rx2,
3684 is_sending_leg: true,
3685 cells_rx: cells_rx1,
3686 };
3687
3688 let relay2 = ConfluxExitState {
3689 runtime: Arc::clone(&relay_runtime),
3690 tunnel: Arc::clone(&tunnel),
3691 circ: circ2,
3692 rtt_delays: circ2_rtt_delays,
3693 stream_state: Arc::clone(&stream_state),
3694 expect_switch: expected_switches2,
3695 event_tx: tx2,
3696 event_rx: rx1,
3697 is_sending_leg: false,
3698 cells_rx: cells_rx2,
3699 };
3700
3701 for mut mock_relay in [relay1, relay2] {
3702 let leg = mock_relay.circ.unique_id;
3703
3704 good_exit_handshake(
3712 &relay_runtime,
3713 mock_relay.rtt_delays.next().flatten(),
3714 &mut mock_relay.circ.chan_rx,
3715 &mut mock_relay.circ.circ_tx,
3716 )
3717 .await;
3718
3719 let relay = ConfluxTestEndpoint::Relay(mock_relay);
3720
3721 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3722 }
3723
3724 tasks.push(rt.spawn_join(
3725 "client task".to_string(),
3726 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3727 tunnel,
3728 conflux_link_rx,
3729 send_data: send_data.clone(),
3730 recv_data: vec![],
3731 }),
3732 ));
3733 let _sinks = futures::future::join_all(tasks).await;
3734 let mut stream_state = stream_state.lock().unwrap();
3735 assert!(stream_state.begin_recvd);
3736
3737 stream_state.data_recvd.sort();
3738 send_data.sort();
3739 assert_eq!(stream_state.data_recvd, send_data);
3740 });
3741 }
3742
3743 #[cfg(feature = "conflux")]
3754 async fn run_multipath_exit_to_client_test(
3755 rt: MockRuntime,
3756 tunnel: TestTunnelCtx,
3757 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3758 send_data: Vec<u8>,
3759 recv_data: Vec<u8>,
3760 ) -> Arc<Mutex<ConfluxStreamState>> {
3761 let TestTunnelCtx {
3762 tunnel,
3763 circs,
3764 conflux_link_rx,
3765 } = tunnel;
3766 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3767
3768 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3769
3770 let mut tasks = vec![];
3771 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3772 let (cells_tx1, cells_rx1) = mpsc::channel(1);
3773 let (cells_tx2, cells_rx2) = mpsc::channel(1);
3774
3775 let dispatcher = CellDispatcher {
3776 leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
3777 .into_iter()
3778 .collect(),
3779 cells_to_send,
3780 };
3781
3782 let (tx1, rx1) = mpsc::channel(1);
3785 let (tx2, rx2) = mpsc::channel(1);
3786
3787 let relay1 = ConfluxExitState {
3788 runtime: Arc::clone(&relay_runtime),
3789 tunnel: Arc::clone(&tunnel),
3790 circ: circ1,
3791 rtt_delays: [].into_iter(),
3792 stream_state: Arc::clone(&stream_state),
3793 expect_switch: vec![],
3795 event_tx: tx1,
3796 event_rx: rx2,
3797 is_sending_leg: false,
3798 cells_rx: cells_rx1,
3799 };
3800
3801 let relay2 = ConfluxExitState {
3802 runtime: Arc::clone(&relay_runtime),
3803 tunnel: Arc::clone(&tunnel),
3804 circ: circ2,
3805 rtt_delays: [].into_iter(),
3806 stream_state: Arc::clone(&stream_state),
3807 expect_switch: vec![],
3809 event_tx: tx2,
3810 event_rx: rx1,
3811 is_sending_leg: true,
3812 cells_rx: cells_rx2,
3813 };
3814
3815 rt.spawn(dispatcher.run()).unwrap();
3820
3821 for mut mock_relay in [relay1, relay2] {
3822 let leg = mock_relay.circ.unique_id;
3823
3824 good_exit_handshake(
3825 &relay_runtime,
3826 mock_relay.rtt_delays.next().flatten(),
3827 &mut mock_relay.circ.chan_rx,
3828 &mut mock_relay.circ.circ_tx,
3829 )
3830 .await;
3831
3832 let relay = ConfluxTestEndpoint::Relay(mock_relay);
3833
3834 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3835 }
3836
3837 tasks.push(rt.spawn_join(
3838 "client task".to_string(),
3839 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3840 tunnel,
3841 conflux_link_rx,
3842 send_data: send_data.clone(),
3843 recv_data,
3844 }),
3845 ));
3846
3847 let _sinks = futures::future::join_all(tasks).await;
3849
3850 stream_state
3851 }
3852
3853 #[traced_test]
3854 #[test]
3855 #[cfg(feature = "conflux")]
3856 fn multipath_exit_to_client() {
3857 const TO_SEND: &[u8] =
3859 b"But something about Buster Friendly irritated John Isidore, one specific thing";
3860
3861 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3862 const CIRC1: usize = 0;
3864 const CIRC2: usize = 1;
3865
3866 let simple_switch = vec![
3890 (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
3891 (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
3892 (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
3894 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3896 (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
3899 (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
3900 (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3901 ];
3902
3903 let multiple_switches = vec![
3950 (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
3953 (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
3955 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3956 (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
3958 (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
3959 (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
3961 (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
3963 (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
3965 (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3967 (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
3969 ];
3970
3971 let tests = [simple_switch, multiple_switches];
3977
3978 for cells_to_send in tests {
3979 let tunnel = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3980 assert_eq!(tunnel.circs.len(), 2);
3981 let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
3982 let cells_to_send = cells_to_send
3983 .into_iter()
3984 .map(|(i, cell)| (circ_ids[i], cell))
3985 .collect();
3986
3987 let send_data = vec![];
3989 let stream_state = run_multipath_exit_to_client_test(
3990 rt.clone(),
3991 tunnel,
3992 cells_to_send,
3993 send_data.clone(),
3994 TO_SEND.into(),
3995 )
3996 .await;
3997 let stream_state = stream_state.lock().unwrap();
3998 assert!(stream_state.begin_recvd);
3999 assert!(stream_state.data_recvd.is_empty());
4001 }
4002 });
4003 }
4004
4005 #[traced_test]
4006 #[test]
4007 #[cfg(all(feature = "conflux", feature = "hs-service"))]
4008 fn conflux_incoming_stream() {
4009 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4010 use std::error::Error as _;
4011
4012 const EXPECTED_HOP: u8 = 1;
4013
4014 let TestTunnelCtx {
4015 tunnel,
4016 circs,
4017 conflux_link_rx,
4018 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
4019
4020 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4021
4022 let link = await_link_payload(&mut circ1.chan_rx).await;
4023 for circ in [&mut circ1, &mut circ2] {
4024 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4025 circ.circ_tx
4026 .send(rmsg_to_ccmsg(None, linked, false))
4027 .await
4028 .unwrap();
4029 }
4030
4031 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4032 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4033
4034 let err = tunnel
4036 .allow_stream_requests(
4037 &[tor_cell::relaycell::RelayCmd::BEGIN],
4038 (tunnel.circ.unique_id(), EXPECTED_HOP.into()).into(),
4039 AllowAllStreamsFilter,
4040 )
4041 .await
4042 .map(|_| ())
4044 .unwrap_err();
4045
4046 let err_src = err.source().unwrap().to_string();
4047 assert!(
4048 err_src.contains("Cannot allow stream requests on a multi-path tunnel"),
4049 "{err_src}"
4050 );
4051 });
4052 }
4053
4054 #[test]
4055 fn client_circ_chan_msg() {
4056 use tor_cell::chancell::msg::{self, AnyChanMsg};
4057 fn good(m: AnyChanMsg) {
4058 assert!(ClientCircChanMsg::try_from(m).is_ok());
4059 }
4060 fn bad(m: AnyChanMsg) {
4061 assert!(ClientCircChanMsg::try_from(m).is_err());
4062 }
4063
4064 good(msg::Destroy::new(2.into()).into());
4065 bad(msg::CreatedFast::new(&b"guaranteed in this world"[..]).into());
4066 bad(msg::Created2::new(&b"and the next"[..]).into());
4067 good(msg::Relay::new(&b"guaranteed guaranteed"[..]).into());
4068 bad(msg::AnyChanMsg::RelayEarly(
4069 msg::Relay::new(&b"for the world and its mother"[..]).into(),
4070 ));
4071 bad(msg::Versions::new([1, 2, 3]).unwrap().into());
4072 }
4073}