1pub(crate) mod halfcirc;
39
40#[cfg(feature = "hs-common")]
41pub mod handshake;
42#[cfg(not(feature = "hs-common"))]
43pub(crate) mod handshake;
44
45pub(crate) mod padding;
46
47pub(super) mod path;
48
49use crate::channel::Channel;
50use crate::circuit::circhop::{HopNegotiationType, HopSettings};
51use crate::circuit::{CircuitRxReceiver, celltypes::*};
52#[cfg(feature = "circ-padding-manual")]
53use crate::client::CircuitPadder;
54use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
55use crate::client::reactor::{CircuitHandshake, CtrlCmd, CtrlMsg, Reactor};
56use crate::crypto::cell::HopNum;
57use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
58use crate::memquota::CircuitAccount;
59use crate::util::skew::ClockSkew;
60use crate::{Error, Result};
61use derive_deftly::Deftly;
62use educe::Educe;
63use path::HopDetail;
64use tor_cell::chancell::{
65 CircId,
66 msg::{self as chanmsg},
67};
68use tor_error::{bad_api_usage, internal, into_internal};
69use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
70use tor_protover::named;
71use tor_rtcompat::DynTimeProvider;
72
73use crate::circuit::UniqId;
74
75use super::{ClientTunnel, TargetHop};
76
77use futures::channel::mpsc;
78use oneshot_fused_workaround as oneshot;
79
80use futures::FutureExt as _;
81use std::collections::HashMap;
82use std::sync::{Arc, Mutex};
83use tor_memquota::derive_deftly_template_HasMemoryCost;
84
85use crate::crypto::handshake::ntor::NtorPublicKey;
86
87#[cfg(test)]
88use crate::stream::{StreamMpscReceiver, StreamMpscSender};
89
90pub use crate::crypto::binding::CircuitBinding;
91pub use path::{Path, PathEntry};
92
93pub const CIRCUIT_BUFFER_SIZE: usize = 128;
95
96pub use crate::circuit::CircParameters;
98
99pub use crate::util::timeout::TimeoutEstimator;
101
102#[derive(Debug, Deftly)]
105#[allow(unreachable_pub)] #[derive_deftly(HasMemoryCost)]
107#[derive_deftly(RestrictedChanMsgSet)]
108#[deftly(usage = "on an open client circuit")]
109pub(super) enum ClientCircChanMsg {
110 Relay(chanmsg::Relay),
113 Destroy(chanmsg::Destroy),
115 }
117
118#[derive(Debug)]
119pub struct ClientCirc {
162 pub(super) mutable: Arc<TunnelMutableState>,
164 unique_id: UniqId,
166 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
168 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
170 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
173 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
174 #[cfg(test)]
176 circid: CircId,
177 pub(super) memquota: CircuitAccount,
179 pub(super) time_provider: DynTimeProvider,
181 pub(super) is_multi_path: bool,
191}
192
193#[derive(Debug, Default)]
213pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
214
215impl TunnelMutableState {
216 pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
218 #[allow(unused)] let state = self
220 .0
221 .lock()
222 .expect("lock poisoned")
223 .insert(unique_id, mutable);
224
225 debug_assert!(state.is_none());
226 }
227
228 pub(super) fn remove(&self, unique_id: UniqId) {
230 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
232
233 debug_assert!(state.is_some());
234 }
235
236 fn all_paths(&self) -> Vec<Arc<Path>> {
238 let lock = self.0.lock().expect("lock poisoned");
239 lock.values().map(|mutable| mutable.path()).collect()
240 }
241
242 #[allow(unstable_name_collisions)]
250 fn single_path(&self) -> Result<Arc<Path>> {
251 use itertools::Itertools as _;
252
253 self.all_paths().into_iter().exactly_one().map_err(|_| {
254 bad_api_usage!("requested the single path of a multi-path tunnel?!").into()
255 })
256 }
257
258 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
263 let lock = self.0.lock().expect("lock poisoned");
264 let mutable = lock
265 .get(&unique_id)
266 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
267
268 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
269 path::HopDetail::Relay(r) => r,
270 #[cfg(feature = "hs-common")]
271 path::HopDetail::Virtual => {
272 panic!("somehow made a circuit with a virtual first hop.")
273 }
274 });
275
276 Ok(first_hop)
277 }
278
279 pub(super) fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
285 let lock = self.0.lock().expect("lock poisoned");
286 let mutable = lock
287 .get(&unique_id)
288 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
289
290 Ok(mutable.last_hop_num())
291 }
292
293 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
297 let lock = self.0.lock().expect("lock poisoned");
298 let mutable = lock
299 .get(&unique_id)
300 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
301
302 Ok(mutable.n_hops())
303 }
304}
305
306#[derive(Educe, Default)]
308#[educe(Debug)]
309pub(super) struct MutableState(Mutex<CircuitState>);
310
311impl MutableState {
312 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
314 let mut mutable = self.0.lock().expect("poisoned lock");
315 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
316 mutable.binding.push(binding);
317 }
318
319 pub(super) fn path(&self) -> Arc<path::Path> {
321 let mutable = self.0.lock().expect("poisoned lock");
322 Arc::clone(&mutable.path)
323 }
324
325 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
328 let mutable = self.0.lock().expect("poisoned lock");
329
330 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
331 }
334
335 fn first_hop(&self) -> Option<HopDetail> {
337 let mutable = self.0.lock().expect("poisoned lock");
338 mutable.path.first_hop()
339 }
340
341 fn last_hop_num(&self) -> Option<HopNum> {
348 let mutable = self.0.lock().expect("poisoned lock");
349 mutable.path.last_hop_num()
350 }
351
352 fn n_hops(&self) -> usize {
359 let mutable = self.0.lock().expect("poisoned lock");
360 mutable.path.n_hops()
361 }
362}
363
364#[derive(Educe, Default)]
366#[educe(Debug)]
367pub(super) struct CircuitState {
368 path: Arc<path::Path>,
374
375 #[educe(Debug(ignore))]
383 binding: Vec<Option<CircuitBinding>>,
384}
385
386pub struct PendingClientTunnel {
391 recvcreated: oneshot::Receiver<CreateResponse>,
394 circ: ClientCirc,
396}
397
398impl ClientCirc {
399 pub fn into_tunnel(self) -> Result<ClientTunnel> {
401 self.try_into()
402 }
403
404 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
412 Ok(self
413 .mutable
414 .first_hop(self.unique_id)
415 .map_err(|_| Error::CircuitClosed)?
416 .expect("called first_hop on an un-constructed circuit"))
417 }
418
419 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
429 let all_paths = self.all_paths();
430 let path = all_paths.first().ok_or_else(|| {
431 tor_error::bad_api_usage!("Called last_hop_info an an un-constructed tunnel")
432 })?;
433 Ok(path
434 .hops()
435 .last()
436 .expect("Called last_hop an an un-constructed circuit")
437 .as_chan_target()
438 .map(OwnedChanTarget::from_chan_target))
439 }
440
441 pub fn last_hop_num(&self) -> Result<HopNum> {
451 Ok(self
452 .mutable
453 .last_hop_num(self.unique_id)?
454 .ok_or_else(|| internal!("no last hop index"))?)
455 }
456
457 pub fn last_hop(&self) -> Result<TargetHop> {
462 let hop_num = self
463 .mutable
464 .last_hop_num(self.unique_id)?
465 .ok_or_else(|| bad_api_usage!("no last hop"))?;
466 Ok((self.unique_id, hop_num).into())
467 }
468
469 pub fn all_paths(&self) -> Vec<Arc<Path>> {
474 self.mutable.all_paths()
475 }
476
477 pub fn single_path(&self) -> Result<Arc<Path>> {
481 self.mutable.single_path()
482 }
483
484 pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
493 let (tx, rx) = oneshot::channel();
494 self.command
495 .unbounded_send(CtrlCmd::GetTunnelActivity { sender: tx })
496 .map_err(|_| Error::CircuitClosed)?;
497
498 Ok(rx.await.map_err(|_| Error::CircuitClosed)?.disused_since())
499 }
500
501 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
505 let (tx, rx) = oneshot::channel();
506
507 self.control
508 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
509 .map_err(|_| Error::CircuitClosed)?;
510
511 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
512 }
513
514 pub fn mq_account(&self) -> &CircuitAccount {
516 &self.memquota
517 }
518
519 #[cfg(feature = "hs-service")]
527 pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
528 let (sender, receiver) = oneshot::channel();
529 let msg = CtrlCmd::GetBindingKey { hop, done: sender };
530 self.command
531 .unbounded_send(msg)
532 .map_err(|_| Error::CircuitClosed)?;
533
534 receiver.await.map_err(|_| Error::CircuitClosed)?
535 }
536
537 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
540 where
541 Tg: CircTarget,
542 {
543 #![allow(deprecated)]
544
545 if target
557 .protovers()
558 .supports_named_subver(named::RELAY_NTORV3)
559 {
560 self.extend_ntor_v3(target, params).await
561 } else {
562 self.extend_ntor(target, params).await
563 }
564 }
565
566 #[deprecated(since = "1.6.1", note = "Use extend instead.")]
569 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
570 where
571 Tg: CircTarget,
572 {
573 let key = NtorPublicKey {
574 id: *target
575 .rsa_identity()
576 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
577 pk: *target.ntor_onion_key(),
578 };
579 let mut linkspecs = target
580 .linkspecs()
581 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
582 if !params.extend_by_ed25519_id {
583 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
584 }
585
586 let (tx, rx) = oneshot::channel();
587
588 let peer_id = OwnedChanTarget::from_chan_target(target);
589 let settings = HopSettings::from_params_and_caps(
590 HopNegotiationType::None,
591 ¶ms,
592 target.protovers(),
593 )?;
594 self.control
595 .unbounded_send(CtrlMsg::ExtendNtor {
596 peer_id,
597 public_key: key,
598 linkspecs,
599 settings,
600 done: tx,
601 })
602 .map_err(|_| Error::CircuitClosed)?;
603
604 rx.await.map_err(|_| Error::CircuitClosed)??;
605
606 Ok(())
607 }
608
609 #[deprecated(since = "1.6.1", note = "Use extend instead.")]
612 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
613 where
614 Tg: CircTarget,
615 {
616 let key = NtorV3PublicKey {
617 id: *target
618 .ed_identity()
619 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
620 pk: *target.ntor_onion_key(),
621 };
622 let mut linkspecs = target
623 .linkspecs()
624 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
625 if !params.extend_by_ed25519_id {
626 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
627 }
628
629 let (tx, rx) = oneshot::channel();
630
631 let peer_id = OwnedChanTarget::from_chan_target(target);
632 let settings = HopSettings::from_params_and_caps(
633 HopNegotiationType::Full,
634 ¶ms,
635 target.protovers(),
636 )?;
637 self.control
638 .unbounded_send(CtrlMsg::ExtendNtorV3 {
639 peer_id,
640 public_key: key,
641 linkspecs,
642 settings,
643 done: tx,
644 })
645 .map_err(|_| Error::CircuitClosed)?;
646
647 rx.await.map_err(|_| Error::CircuitClosed)??;
648
649 Ok(())
650 }
651
652 #[cfg(feature = "hs-common")]
676 pub async fn extend_virtual(
677 &self,
678 protocol: handshake::RelayProtocol,
679 role: handshake::HandshakeRole,
680 seed: impl handshake::KeyGenerator,
681 params: &CircParameters,
682 capabilities: &tor_protover::Protocols,
683 ) -> Result<()> {
684 use self::handshake::BoxedClientLayer;
685
686 let negotiation_type = match protocol {
688 handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
689 };
690 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
691
692 let BoxedClientLayer { fwd, back, binding } =
693 protocol.construct_client_layers(role, seed)?;
694
695 let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
696 let (tx, rx) = oneshot::channel();
697 let message = CtrlCmd::ExtendVirtual {
698 cell_crypto: (fwd, back, binding),
699 settings,
700 done: tx,
701 };
702
703 self.command
704 .unbounded_send(message)
705 .map_err(|_| Error::CircuitClosed)?;
706
707 rx.await.map_err(|_| Error::CircuitClosed)?
708 }
709
710 #[cfg(feature = "circ-padding-manual")]
714 pub async fn start_padding_at_hop(&self, hop: HopNum, padder: CircuitPadder) -> Result<()> {
715 self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), Some(padder))
716 .await
717 }
718
719 #[cfg(feature = "circ-padding-manual")]
723 pub async fn stop_padding_at_hop(&self, hop: HopNum) -> Result<()> {
724 self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), None)
725 .await
726 }
727
728 #[cfg(feature = "circ-padding-manual")]
730 pub(super) async fn set_padder_impl(
731 &self,
732 hop: crate::HopLocation,
733 padder: Option<CircuitPadder>,
734 ) -> Result<()> {
735 let (tx, rx) = oneshot::channel();
736 let msg = CtrlCmd::SetPadder {
737 hop,
738 padder,
739 sender: tx,
740 };
741 self.command
742 .unbounded_send(msg)
743 .map_err(|_| Error::CircuitClosed)?;
744 rx.await.map_err(|_| Error::CircuitClosed)?
745 }
746
747 pub fn is_closing(&self) -> bool {
749 self.control.is_closed()
750 }
751
752 pub fn unique_id(&self) -> UniqId {
754 self.unique_id
755 }
756
757 pub fn n_hops(&self) -> Result<usize> {
764 self.mutable
765 .n_hops(self.unique_id)
766 .map_err(|_| Error::CircuitClosed)
767 }
768
769 pub fn wait_for_close(
776 &self,
777 ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
778 self.reactor_closed_rx.clone().map(|_| ())
779 }
780}
781
782impl PendingClientTunnel {
783 #[allow(clippy::too_many_arguments)]
787 pub(crate) fn new(
788 id: CircId,
789 channel: Arc<Channel>,
790 createdreceiver: oneshot::Receiver<CreateResponse>,
791 input: CircuitRxReceiver,
792 unique_id: UniqId,
793 runtime: DynTimeProvider,
794 memquota: CircuitAccount,
795 padding_ctrl: PaddingController,
796 padding_stream: PaddingEventStream,
797 timeouts: Arc<dyn TimeoutEstimator>,
798 ) -> (PendingClientTunnel, crate::client::reactor::Reactor) {
799 let time_provider = channel.time_provider().clone();
800 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) = Reactor::new(
801 channel,
802 id,
803 unique_id,
804 input,
805 runtime,
806 memquota.clone(),
807 padding_ctrl,
808 padding_stream,
809 timeouts,
810 );
811
812 let circuit = ClientCirc {
813 mutable,
814 unique_id,
815 control: control_tx,
816 command: command_tx,
817 reactor_closed_rx: reactor_closed_rx.shared(),
818 #[cfg(test)]
819 circid: id,
820 memquota,
821 time_provider,
822 is_multi_path: false,
823 };
824
825 let pending = PendingClientTunnel {
826 recvcreated: createdreceiver,
827 circ: circuit,
828 };
829 (pending, reactor)
830 }
831
832 pub fn peek_unique_id(&self) -> UniqId {
834 self.circ.unique_id
835 }
836
837 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<ClientTunnel> {
844 let protocols = tor_protover::Protocols::new();
849 let settings =
850 HopSettings::from_params_and_caps(HopNegotiationType::None, ¶ms, &protocols)?;
851 let (tx, rx) = oneshot::channel();
852 self.circ
853 .control
854 .unbounded_send(CtrlMsg::Create {
855 recv_created: self.recvcreated,
856 handshake: CircuitHandshake::CreateFast,
857 settings,
858 done: tx,
859 })
860 .map_err(|_| Error::CircuitClosed)?;
861
862 rx.await.map_err(|_| Error::CircuitClosed)??;
863
864 self.circ.into_tunnel()
865 }
866
867 pub async fn create_firsthop<Tg>(
872 self,
873 target: &Tg,
874 params: CircParameters,
875 ) -> Result<ClientTunnel>
876 where
877 Tg: tor_linkspec::CircTarget,
878 {
879 #![allow(deprecated)]
880 if target
882 .protovers()
883 .supports_named_subver(named::RELAY_NTORV3)
884 {
885 self.create_firsthop_ntor_v3(target, params).await
886 } else {
887 self.create_firsthop_ntor(target, params).await
888 }
889 }
890
891 #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
896 pub async fn create_firsthop_ntor<Tg>(
897 self,
898 target: &Tg,
899 params: CircParameters,
900 ) -> Result<ClientTunnel>
901 where
902 Tg: tor_linkspec::CircTarget,
903 {
904 let (tx, rx) = oneshot::channel();
905 let settings = HopSettings::from_params_and_caps(
906 HopNegotiationType::None,
907 ¶ms,
908 target.protovers(),
909 )?;
910
911 self.circ
912 .control
913 .unbounded_send(CtrlMsg::Create {
914 recv_created: self.recvcreated,
915 handshake: CircuitHandshake::Ntor {
916 public_key: NtorPublicKey {
917 id: *target
918 .rsa_identity()
919 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
920 pk: *target.ntor_onion_key(),
921 },
922 ed_identity: *target
923 .ed_identity()
924 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
925 },
926 settings,
927 done: tx,
928 })
929 .map_err(|_| Error::CircuitClosed)?;
930
931 rx.await.map_err(|_| Error::CircuitClosed)??;
932
933 self.circ.into_tunnel()
934 }
935
936 #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
945 pub async fn create_firsthop_ntor_v3<Tg>(
946 self,
947 target: &Tg,
948 params: CircParameters,
949 ) -> Result<ClientTunnel>
950 where
951 Tg: tor_linkspec::CircTarget,
952 {
953 let settings = HopSettings::from_params_and_caps(
954 HopNegotiationType::Full,
955 ¶ms,
956 target.protovers(),
957 )?;
958 let (tx, rx) = oneshot::channel();
959
960 self.circ
961 .control
962 .unbounded_send(CtrlMsg::Create {
963 recv_created: self.recvcreated,
964 handshake: CircuitHandshake::NtorV3 {
965 public_key: NtorV3PublicKey {
966 id: *target
967 .ed_identity()
968 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
969 pk: *target.ntor_onion_key(),
970 },
971 },
972 settings,
973 done: tx,
974 })
975 .map_err(|_| Error::CircuitClosed)?;
976
977 rx.await.map_err(|_| Error::CircuitClosed)??;
978
979 self.circ.into_tunnel()
980 }
981}
982
983#[cfg(test)]
984pub(crate) mod test {
985 #![allow(clippy::bool_assert_comparison)]
987 #![allow(clippy::clone_on_copy)]
988 #![allow(clippy::dbg_macro)]
989 #![allow(clippy::mixed_attributes_style)]
990 #![allow(clippy::print_stderr)]
991 #![allow(clippy::print_stdout)]
992 #![allow(clippy::single_char_pattern)]
993 #![allow(clippy::unwrap_used)]
994 #![allow(clippy::unchecked_time_subtraction)]
995 #![allow(clippy::useless_vec)]
996 #![allow(clippy::needless_pass_by_value)]
997 use super::*;
1000 use crate::channel::test::{CodecResult, new_reactor};
1001 use crate::circuit::CircuitRxSender;
1002 use crate::client::circuit::padding::new_padding;
1003 use crate::client::stream::DataStream;
1004 #[cfg(feature = "hs-service")]
1005 use crate::client::stream::IncomingStreamRequestFilter;
1006 use crate::congestion::params::CongestionControlParams;
1007 use crate::congestion::test_utils::params::build_cc_vegas_params;
1008 use crate::crypto::cell::RelayCellBody;
1009 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1010 use crate::memquota::SpecificAccount as _;
1011 use crate::stream::flow_ctrl::params::FlowCtrlParameters;
1012 use crate::util::DummyTimeoutEstimator;
1013 use assert_matches::assert_matches;
1014 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1015 use futures::channel::mpsc::{Receiver, Sender};
1016 use futures::io::{AsyncReadExt, AsyncWriteExt};
1017 use futures::sink::SinkExt;
1018 use futures::stream::StreamExt;
1019 use hex_literal::hex;
1020 use std::collections::{HashMap, VecDeque};
1021 use std::fmt::Debug;
1022 use std::time::Duration;
1023 use tor_basic_utils::test_rng::testing_rng;
1024 use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCell, ChanCmd, msg as chanmsg};
1025 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1026 use tor_cell::relaycell::msg::SendmeTag;
1027 use tor_cell::relaycell::{
1028 AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg, msg::AnyRelayMsg,
1029 };
1030 use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
1031 use tor_linkspec::OwnedCircTarget;
1032 use tor_memquota::HasMemoryCost;
1033 use tor_rtcompat::Runtime;
1034 use tor_rtcompat::SpawnExt;
1035 use tracing::trace;
1036 use tracing_test::traced_test;
1037
1038 #[cfg(feature = "conflux")]
1039 use {
1040 crate::client::reactor::ConfluxHandshakeResult,
1041 crate::util::err::ConfluxHandshakeError,
1042 futures::future::FusedFuture,
1043 futures::lock::Mutex as AsyncMutex,
1044 std::pin::Pin,
1045 std::result::Result as StdResult,
1046 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1047 tor_cell::relaycell::msg::ConfluxLink,
1048 tor_rtmock::MockRuntime,
1049 };
1050
1051 impl PendingClientTunnel {
1052 pub(crate) fn peek_circid(&self) -> CircId {
1054 self.circ.circid
1055 }
1056 }
1057
1058 impl ClientCirc {
1059 pub(crate) fn peek_circid(&self) -> CircId {
1061 self.circid
1062 }
1063 }
1064
1065 impl ClientTunnel {
1066 pub(crate) async fn resolve_last_hop(&self) -> TargetHop {
1067 let (sender, receiver) = oneshot::channel();
1068 let _ =
1069 self.as_single_circ()
1070 .unwrap()
1071 .command
1072 .unbounded_send(CtrlCmd::ResolveTargetHop {
1073 hop: TargetHop::LastHop,
1074 done: sender,
1075 });
1076 TargetHop::Hop(receiver.await.unwrap().unwrap())
1077 }
1078 }
1079
1080 fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> AnyChanMsg {
1081 let rfmt = RelayCellFormat::V0;
1083 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1084 .encode(rfmt, &mut testing_rng())
1085 .unwrap();
1086 let chanmsg = chanmsg::Relay::from(body);
1087 AnyChanMsg::Relay(chanmsg)
1088 }
1089
1090 const EXAMPLE_SK: [u8; 32] =
1092 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1093 const EXAMPLE_PK: [u8; 32] =
1094 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1095 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1096 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1097
1098 #[cfg(test)]
1100 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1101 buffer: usize,
1102 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1103 crate::fake_mpsc(buffer)
1104 }
1105
1106 fn example_target() -> OwnedCircTarget {
1108 let mut builder = OwnedCircTarget::builder();
1109 builder
1110 .chan_target()
1111 .ed_identity(EXAMPLE_ED_ID.into())
1112 .rsa_identity(EXAMPLE_RSA_ID.into());
1113 builder
1114 .ntor_onion_key(EXAMPLE_PK.into())
1115 .protocols("FlowCtrl=1-2".parse().unwrap())
1116 .build()
1117 .unwrap()
1118 }
1119 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1120 crate::crypto::handshake::ntor::NtorSecretKey::new(
1121 EXAMPLE_SK.into(),
1122 EXAMPLE_PK.into(),
1123 EXAMPLE_RSA_ID.into(),
1124 )
1125 }
1126 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1127 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1128 EXAMPLE_SK.into(),
1129 EXAMPLE_PK.into(),
1130 EXAMPLE_ED_ID.into(),
1131 )
1132 }
1133
1134 fn working_fake_channel<R: Runtime>(
1135 rt: &R,
1136 ) -> (Arc<Channel>, Receiver<AnyChanCell>, Sender<CodecResult>) {
1137 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1138 rt.spawn(async {
1139 let _ignore = chan_reactor.run().await;
1140 })
1141 .unwrap();
1142 (channel, rx, tx)
1143 }
1144
1145 #[derive(Copy, Clone)]
1147 enum HandshakeType {
1148 Fast,
1149 Ntor,
1150 NtorV3,
1151 }
1152
1153 #[allow(deprecated)]
1154 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1155 use crate::crypto::handshake::{ServerHandshake, fast::CreateFastServer, ntor::NtorServer};
1159
1160 let (chan, mut rx, _sink) = working_fake_channel(rt);
1161 let circid = CircId::new(128).unwrap();
1162 let (created_send, created_recv) = oneshot::channel();
1163 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1164 let unique_id = UniqId::new(23, 17);
1165 let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1166
1167 let (pending, reactor) = PendingClientTunnel::new(
1168 circid,
1169 chan,
1170 created_recv,
1171 circmsg_recv,
1172 unique_id,
1173 DynTimeProvider::new(rt.clone()),
1174 CircuitAccount::new_noop(),
1175 padding_ctrl,
1176 padding_stream,
1177 Arc::new(DummyTimeoutEstimator),
1178 );
1179
1180 rt.spawn(async {
1181 let _ignore = reactor.run().await;
1182 })
1183 .unwrap();
1184
1185 let simulate_relay_fut = async move {
1187 let mut rng = testing_rng();
1188 let create_cell = rx.next().await.unwrap();
1189 assert_eq!(create_cell.circid(), Some(circid));
1190 let reply = match handshake_type {
1191 HandshakeType::Fast => {
1192 let cf = match create_cell.msg() {
1193 AnyChanMsg::CreateFast(cf) => cf,
1194 other => panic!("{:?}", other),
1195 };
1196 let (_, rep) = CreateFastServer::server(
1197 &mut rng,
1198 &mut |_: &()| Some(()),
1199 &[()],
1200 cf.handshake(),
1201 )
1202 .unwrap();
1203 CreateResponse::CreatedFast(CreatedFast::new(rep))
1204 }
1205 HandshakeType::Ntor => {
1206 let c2 = match create_cell.msg() {
1207 AnyChanMsg::Create2(c2) => c2,
1208 other => panic!("{:?}", other),
1209 };
1210 let (_, rep) = NtorServer::server(
1211 &mut rng,
1212 &mut |_: &()| Some(()),
1213 &[example_ntor_key()],
1214 c2.body(),
1215 )
1216 .unwrap();
1217 CreateResponse::Created2(Created2::new(rep))
1218 }
1219 HandshakeType::NtorV3 => {
1220 let c2 = match create_cell.msg() {
1221 AnyChanMsg::Create2(c2) => c2,
1222 other => panic!("{:?}", other),
1223 };
1224 let mut reply_fn = if with_cc {
1225 |client_exts: &[CircRequestExt]| {
1226 let _ = client_exts
1227 .iter()
1228 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1229 .expect("Client failed to request CC");
1230 Some(vec![CircResponseExt::CcResponse(
1233 extend_ext::CcResponse::new(31),
1234 )])
1235 }
1236 } else {
1237 |_: &_| Some(vec![])
1238 };
1239 let (_, rep) = NtorV3Server::server(
1240 &mut rng,
1241 &mut reply_fn,
1242 &[example_ntor_v3_key()],
1243 c2.body(),
1244 )
1245 .unwrap();
1246 CreateResponse::Created2(Created2::new(rep))
1247 }
1248 };
1249 created_send.send(reply).unwrap();
1250 };
1251 let client_fut = async move {
1253 let target = example_target();
1254 let params = CircParameters::default();
1255 let ret = match handshake_type {
1256 HandshakeType::Fast => {
1257 trace!("doing fast create");
1258 pending.create_firsthop_fast(params).await
1259 }
1260 HandshakeType::Ntor => {
1261 trace!("doing ntor create");
1262 pending.create_firsthop_ntor(&target, params).await
1263 }
1264 HandshakeType::NtorV3 => {
1265 let params = if with_cc {
1266 CircParameters::new(
1268 true,
1269 build_cc_vegas_params(),
1270 FlowCtrlParameters::defaults_for_tests(),
1271 )
1272 } else {
1273 params
1274 };
1275 trace!("doing ntor_v3 create");
1276 pending.create_firsthop_ntor_v3(&target, params).await
1277 }
1278 };
1279 trace!("create done: result {:?}", ret);
1280 ret
1281 };
1282
1283 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1284
1285 let _circ = circ.unwrap();
1286
1287 assert_eq!(_circ.n_hops().unwrap(), 1);
1289 }
1290
1291 #[traced_test]
1292 #[test]
1293 fn test_create_fast() {
1294 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1295 test_create(&rt, HandshakeType::Fast, false).await;
1296 });
1297 }
1298 #[traced_test]
1299 #[test]
1300 fn test_create_ntor() {
1301 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1302 test_create(&rt, HandshakeType::Ntor, false).await;
1303 });
1304 }
1305 #[traced_test]
1306 #[test]
1307 fn test_create_ntor_v3() {
1308 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1309 test_create(&rt, HandshakeType::NtorV3, false).await;
1310 });
1311 }
1312 #[traced_test]
1313 #[test]
1314 #[cfg(feature = "flowctl-cc")]
1315 fn test_create_ntor_v3_with_cc() {
1316 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1317 test_create(&rt, HandshakeType::NtorV3, true).await;
1318 });
1319 }
1320
1321 pub(crate) struct DummyCrypto {
1324 counter_tag: [u8; 20],
1325 counter: u32,
1326 lasthop: bool,
1327 }
1328 impl DummyCrypto {
1329 fn next_tag(&mut self) -> SendmeTag {
1330 #![allow(clippy::identity_op)]
1331 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1332 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1333 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1334 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1335 self.counter += 1;
1336 self.counter_tag.into()
1337 }
1338 }
1339
1340 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1341 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1342 self.next_tag()
1343 }
1344 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1345 }
1346 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1347 fn decrypt_inbound(
1348 &mut self,
1349 _cmd: ChanCmd,
1350 _cell: &mut RelayCellBody,
1351 ) -> Option<SendmeTag> {
1352 if self.lasthop {
1353 Some(self.next_tag())
1354 } else {
1355 None
1356 }
1357 }
1358 }
1359 impl DummyCrypto {
1360 pub(crate) fn new(lasthop: bool) -> Self {
1361 DummyCrypto {
1362 counter_tag: [0; 20],
1363 counter: 0,
1364 lasthop,
1365 }
1366 }
1367 }
1368
1369 async fn newtunnel_ext<R: Runtime>(
1372 rt: &R,
1373 unique_id: UniqId,
1374 chan: Arc<Channel>,
1375 hops: Vec<path::HopDetail>,
1376 next_msg_from: HopNum,
1377 params: CircParameters,
1378 ) -> (ClientTunnel, CircuitRxSender) {
1379 let circid = CircId::new(128).unwrap();
1380 let (_created_send, created_recv) = oneshot::channel();
1381 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1382 let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1383
1384 let (pending, reactor) = PendingClientTunnel::new(
1385 circid,
1386 chan,
1387 created_recv,
1388 circmsg_recv,
1389 unique_id,
1390 DynTimeProvider::new(rt.clone()),
1391 CircuitAccount::new_noop(),
1392 padding_ctrl,
1393 padding_stream,
1394 Arc::new(DummyTimeoutEstimator),
1395 );
1396
1397 rt.spawn(async {
1398 let _ignore = reactor.run().await;
1399 })
1400 .unwrap();
1401 let PendingClientTunnel {
1402 circ,
1403 recvcreated: _,
1404 } = pending;
1405
1406 let relay_cell_format = RelayCellFormat::V0;
1408
1409 let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1410 for (idx, peer_id) in hops.into_iter().enumerate() {
1411 let (tx, rx) = oneshot::channel();
1412 let idx = idx as u8;
1413
1414 circ.command
1415 .unbounded_send(CtrlCmd::AddFakeHop {
1416 relay_cell_format,
1417 fwd_lasthop: idx == last_hop_num,
1418 rev_lasthop: idx == u8::from(next_msg_from),
1419 peer_id,
1420 params: params.clone(),
1421 done: tx,
1422 })
1423 .unwrap();
1424 rx.await.unwrap().unwrap();
1425 }
1426 (circ.into_tunnel().unwrap(), circmsg_send)
1427 }
1428
1429 async fn newtunnel<R: Runtime>(
1432 rt: &R,
1433 chan: Arc<Channel>,
1434 ) -> (Arc<ClientTunnel>, CircuitRxSender) {
1435 let hops = std::iter::repeat_with(|| {
1436 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1437 .ed_identity([4; 32].into())
1438 .rsa_identity([5; 20].into())
1439 .build()
1440 .expect("Could not construct fake hop");
1441
1442 path::HopDetail::Relay(peer_id)
1443 })
1444 .take(3)
1445 .collect();
1446
1447 let unique_id = UniqId::new(23, 17);
1448 let (tunnel, circmsg_send) = newtunnel_ext(
1449 rt,
1450 unique_id,
1451 chan,
1452 hops,
1453 2.into(),
1454 CircParameters::default(),
1455 )
1456 .await;
1457
1458 (Arc::new(tunnel), circmsg_send)
1459 }
1460
1461 fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1464 (0..n)
1465 .map(|idx| {
1466 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1467 .ed_identity([idx + start_idx; 32].into())
1468 .rsa_identity([idx + start_idx + 1; 20].into())
1469 .build()
1470 .expect("Could not construct fake hop");
1471
1472 path::HopDetail::Relay(peer_id)
1473 })
1474 .collect()
1475 }
1476
1477 #[allow(deprecated)]
1478 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1479 use crate::crypto::handshake::{ServerHandshake, ntor::NtorServer};
1480
1481 let (chan, mut rx, _sink) = working_fake_channel(rt);
1482 let (tunnel, mut sink) = newtunnel(rt, chan).await;
1483 let circ = Arc::new(tunnel.as_single_circ().unwrap());
1484 let circid = circ.peek_circid();
1485 let params = CircParameters::default();
1486
1487 let extend_fut = async move {
1488 let target = example_target();
1489 match handshake_type {
1490 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1491 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1492 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1493 };
1494 circ };
1496 let reply_fut = async move {
1497 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1500 assert_eq!(id, Some(circid));
1501 let rmsg = match chmsg {
1502 AnyChanMsg::RelayEarly(r) => {
1503 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1504 .unwrap()
1505 }
1506 other => panic!("{:?}", other),
1507 };
1508 let e2 = match rmsg.msg() {
1509 AnyRelayMsg::Extend2(e2) => e2,
1510 other => panic!("{:?}", other),
1511 };
1512 let mut rng = testing_rng();
1513 let reply = match handshake_type {
1514 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1515 HandshakeType::Ntor => {
1516 let (_keygen, reply) = NtorServer::server(
1517 &mut rng,
1518 &mut |_: &()| Some(()),
1519 &[example_ntor_key()],
1520 e2.handshake(),
1521 )
1522 .unwrap();
1523 reply
1524 }
1525 HandshakeType::NtorV3 => {
1526 let (_keygen, reply) = NtorV3Server::server(
1527 &mut rng,
1528 &mut |_: &[CircRequestExt]| Some(vec![]),
1529 &[example_ntor_v3_key()],
1530 e2.handshake(),
1531 )
1532 .unwrap();
1533 reply
1534 }
1535 };
1536
1537 let extended2 = relaymsg::Extended2::new(reply).into();
1538 sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1539 (sink, rx) };
1541
1542 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1543
1544 assert_eq!(circ.n_hops().unwrap(), 4);
1546
1547 {
1549 let path = circ.single_path().unwrap();
1550 let path = path
1551 .all_hops()
1552 .filter_map(|hop| match hop {
1553 path::HopDetail::Relay(r) => Some(r),
1554 #[cfg(feature = "hs-common")]
1555 path::HopDetail::Virtual => None,
1556 })
1557 .collect::<Vec<_>>();
1558
1559 assert_eq!(path.len(), 4);
1560 use tor_linkspec::HasRelayIds;
1561 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1562 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1563 }
1564 {
1565 let path = circ.single_path().unwrap();
1566 assert_eq!(path.n_hops(), 4);
1567 use tor_linkspec::HasRelayIds;
1568 assert_eq!(
1569 path.hops()[3].as_chan_target().unwrap().ed_identity(),
1570 example_target().ed_identity()
1571 );
1572 assert_ne!(
1573 path.hops()[0].as_chan_target().unwrap().ed_identity(),
1574 example_target().ed_identity()
1575 );
1576 }
1577 }
1578
1579 #[traced_test]
1580 #[test]
1581 fn test_extend_ntor() {
1582 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1583 test_extend(&rt, HandshakeType::Ntor).await;
1584 });
1585 }
1586
1587 #[traced_test]
1588 #[test]
1589 fn test_extend_ntor_v3() {
1590 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1591 test_extend(&rt, HandshakeType::NtorV3).await;
1592 });
1593 }
1594
1595 #[allow(deprecated)]
1596 async fn bad_extend_test_impl<R: Runtime>(
1597 rt: &R,
1598 reply_hop: HopNum,
1599 bad_reply: AnyChanMsg,
1600 ) -> Error {
1601 let (chan, mut rx, _sink) = working_fake_channel(rt);
1602 let hops = std::iter::repeat_with(|| {
1603 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1604 .ed_identity([4; 32].into())
1605 .rsa_identity([5; 20].into())
1606 .build()
1607 .expect("Could not construct fake hop");
1608
1609 path::HopDetail::Relay(peer_id)
1610 })
1611 .take(3)
1612 .collect();
1613
1614 let unique_id = UniqId::new(23, 17);
1615 let (tunnel, mut sink) = newtunnel_ext(
1616 rt,
1617 unique_id,
1618 chan,
1619 hops,
1620 reply_hop,
1621 CircParameters::default(),
1622 )
1623 .await;
1624 let params = CircParameters::default();
1625
1626 let target = example_target();
1627 let reply_task_handle = rt
1628 .spawn_with_handle(async move {
1629 let (_circid, chanmsg) = rx.next().await.unwrap().into_circid_and_msg();
1631 let AnyChanMsg::RelayEarly(relay_early) = chanmsg else {
1632 panic!("unexpected message {chanmsg:?}");
1633 };
1634 let relaymsg = UnparsedRelayMsg::from_singleton_body(
1635 RelayCellFormat::V0,
1636 relay_early.into_relay_body(),
1637 )
1638 .unwrap();
1639 assert_eq!(relaymsg.cmd(), RelayCmd::EXTEND2);
1640
1641 sink.send(bad_reply).await.unwrap();
1643 sink
1644 })
1645 .unwrap();
1646 let outcome = tunnel
1647 .as_single_circ()
1648 .unwrap()
1649 .extend_ntor(&target, params)
1650 .await;
1651 let _sink = reply_task_handle.await;
1652
1653 assert_eq!(tunnel.n_hops().unwrap(), 3);
1654 assert!(outcome.is_err());
1655 outcome.unwrap_err()
1656 }
1657
1658 #[traced_test]
1659 #[test]
1660 fn bad_extend_wronghop() {
1661 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1662 let extended2 = relaymsg::Extended2::new(vec![]).into();
1663 let cc = rmsg_to_ccmsg(None, extended2);
1664
1665 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1666 match error {
1671 Error::CircuitClosed => {}
1672 x => panic!("got other error: {}", x),
1673 }
1674 });
1675 }
1676
1677 #[traced_test]
1678 #[test]
1679 fn bad_extend_wrongtype() {
1680 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1681 let extended = relaymsg::Extended::new(vec![7; 200]).into();
1682 let cc = rmsg_to_ccmsg(None, extended);
1683
1684 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1685 match error {
1686 Error::BytesErr {
1687 err: tor_bytes::Error::InvalidMessage(_),
1688 object: "extended2 message",
1689 } => {}
1690 other => panic!("{:?}", other),
1691 }
1692 });
1693 }
1694
1695 #[traced_test]
1696 #[test]
1697 fn bad_extend_destroy() {
1698 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1699 let cc = AnyChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1700 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1701 match error {
1702 Error::CircuitClosed => {}
1703 other => panic!("{:?}", other),
1704 }
1705 });
1706 }
1707
1708 #[traced_test]
1709 #[test]
1710 fn bad_extend_crypto() {
1711 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1712 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1713 let cc = rmsg_to_ccmsg(None, extended2);
1714 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1715 assert_matches!(error, Error::BadCircHandshakeAuth);
1716 });
1717 }
1718
1719 #[traced_test]
1720 #[test]
1721 fn begindir() {
1722 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1723 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1724 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1725 let circ = tunnel.as_single_circ().unwrap();
1726 let circid = circ.peek_circid();
1727
1728 let begin_and_send_fut = async move {
1729 let mut stream = tunnel.begin_dir_stream().await.unwrap();
1732 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1733 stream.flush().await.unwrap();
1734 let mut buf = [0_u8; 1024];
1735 let n = stream.read(&mut buf).await.unwrap();
1736 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1737 let n = stream.read(&mut buf).await.unwrap();
1738 assert_eq!(n, 0);
1739 stream
1740 };
1741 let reply_fut = async move {
1742 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1745 assert_eq!(id, Some(circid));
1746 let rmsg = match chmsg {
1747 AnyChanMsg::Relay(r) => {
1748 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1749 .unwrap()
1750 }
1751 other => panic!("{:?}", other),
1752 };
1753 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1754 assert_matches!(rmsg, AnyRelayMsg::BeginDir(_));
1755
1756 let connected = relaymsg::Connected::new_empty().into();
1758 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1759
1760 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1762 assert_eq!(id, Some(circid));
1763 let rmsg = match chmsg {
1764 AnyChanMsg::Relay(r) => {
1765 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1766 .unwrap()
1767 }
1768 other => panic!("{:?}", other),
1769 };
1770 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1771 assert_eq!(streamid_2, streamid);
1772 if let AnyRelayMsg::Data(d) = rmsg {
1773 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1774 } else {
1775 panic!();
1776 }
1777
1778 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1780 .unwrap()
1781 .into();
1782 sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
1783
1784 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1786 sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
1787
1788 (rx, sink) };
1790
1791 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1792 });
1793 }
1794
1795 fn close_stream_helper(by_drop: bool) {
1797 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1798 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1799 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1800
1801 let stream_fut = async move {
1802 let stream = tunnel
1803 .begin_stream("www.example.com", 80, None)
1804 .await
1805 .unwrap();
1806
1807 let (r, mut w) = stream.split();
1808 if by_drop {
1809 drop(r);
1811 drop(w);
1812 (None, tunnel) } else {
1814 w.close().await.unwrap();
1816 (Some(r), tunnel)
1817 }
1818 };
1819 let handler_fut = async {
1820 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1822 let rmsg = match msg {
1823 AnyChanMsg::Relay(r) => {
1824 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1825 .unwrap()
1826 }
1827 other => panic!("{:?}", other),
1828 };
1829 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1830 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1831
1832 let connected =
1834 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1835 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1836
1837 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1839 let rmsg = match msg {
1840 AnyChanMsg::Relay(r) => {
1841 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1842 .unwrap()
1843 }
1844 other => panic!("{:?}", other),
1845 };
1846 let (_, rmsg) = rmsg.into_streamid_and_msg();
1847 assert_eq!(rmsg.cmd(), RelayCmd::END);
1848
1849 (rx, sink) };
1851
1852 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1853 });
1854 }
1855
1856 #[traced_test]
1857 #[test]
1858 fn drop_stream() {
1859 close_stream_helper(true);
1860 }
1861
1862 #[traced_test]
1863 #[test]
1864 fn close_stream() {
1865 close_stream_helper(false);
1866 }
1867
1868 #[traced_test]
1869 #[test]
1870 fn expire_halfstreams() {
1871 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1872 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1873 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1874
1875 let client_fut = async move {
1876 let stream = tunnel
1877 .begin_stream("www.example.com", 80, None)
1878 .await
1879 .unwrap();
1880
1881 let (r, mut w) = stream.split();
1882 w.close().await.unwrap();
1884 (Some(r), tunnel)
1885 };
1886 let exit_fut = async {
1887 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1889 let rmsg = match msg {
1890 AnyChanMsg::Relay(r) => {
1891 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1892 .unwrap()
1893 }
1894 other => panic!("{:?}", other),
1895 };
1896 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1897 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1898
1899 let connected =
1901 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1902 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1903
1904 (rx, streamid, sink) };
1906
1907 let ((_opt_reader, tunnel), (_rx, streamid, mut sink)) =
1908 futures::join!(client_fut, exit_fut);
1909
1910 rt.progress_until_stalled().await;
1913
1914 assert!(!tunnel.is_closed());
1916
1917 let data = relaymsg::Data::new(b"hello").unwrap();
1920 sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data)))
1921 .await
1922 .unwrap();
1923 rt.progress_until_stalled().await;
1924
1925 assert!(!tunnel.is_closed());
1927
1928 let stream_timeout = DummyTimeoutEstimator.circuit_build_timeout(3);
1933 rt.advance_by(2 * stream_timeout).await;
1934
1935 let data = relaymsg::Data::new(b"hello").unwrap();
1938 sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data)))
1939 .await
1940 .unwrap();
1941 rt.progress_until_stalled().await;
1942
1943 assert!(tunnel.is_closed());
1945 });
1946 }
1947
1948 async fn setup_incoming_sendme_case<R: Runtime>(
1950 rt: &R,
1951 n_to_send: usize,
1952 ) -> (
1953 Arc<ClientTunnel>,
1954 DataStream,
1955 CircuitRxSender,
1956 Option<StreamId>,
1957 usize,
1958 Receiver<AnyChanCell>,
1959 Sender<CodecResult>,
1960 ) {
1961 let (chan, mut rx, sink2) = working_fake_channel(rt);
1962 let (tunnel, mut sink) = newtunnel(rt, chan).await;
1963 let circid = tunnel.as_single_circ().unwrap().peek_circid();
1964
1965 let begin_and_send_fut = {
1966 let tunnel = tunnel.clone();
1967 async move {
1968 let mut stream = tunnel
1970 .begin_stream("www.example.com", 443, None)
1971 .await
1972 .unwrap();
1973 let junk = [0_u8; 1024];
1974 let mut remaining = n_to_send;
1975 while remaining > 0 {
1976 let n = std::cmp::min(remaining, junk.len());
1977 stream.write_all(&junk[..n]).await.unwrap();
1978 remaining -= n;
1979 }
1980 stream.flush().await.unwrap();
1981 stream
1982 }
1983 };
1984
1985 let receive_fut = async move {
1986 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1988 let rmsg = match chmsg {
1989 AnyChanMsg::Relay(r) => {
1990 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1991 .unwrap()
1992 }
1993 other => panic!("{:?}", other),
1994 };
1995 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1996 assert_matches!(rmsg, AnyRelayMsg::Begin(_));
1997 let connected = relaymsg::Connected::new_empty().into();
1999 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2000 let mut bytes_received = 0_usize;
2002 let mut cells_received = 0_usize;
2003 while bytes_received < n_to_send {
2004 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2006 assert_eq!(id, Some(circid));
2007
2008 let rmsg = match chmsg {
2009 AnyChanMsg::Relay(r) => {
2010 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2011 .unwrap()
2012 }
2013 other => panic!("{:?}", other),
2014 };
2015 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2016 assert_eq!(streamid2, streamid);
2017 if let AnyRelayMsg::Data(dat) = rmsg {
2018 cells_received += 1;
2019 bytes_received += dat.as_ref().len();
2020 } else {
2021 panic!();
2022 }
2023 }
2024
2025 (sink, streamid, cells_received, rx)
2026 };
2027
2028 let (stream, (sink, streamid, cells_received, rx)) =
2029 futures::join!(begin_and_send_fut, receive_fut);
2030
2031 (tunnel, stream, sink, streamid, cells_received, rx, sink2)
2032 }
2033
2034 #[traced_test]
2035 #[test]
2036 fn accept_valid_sendme() {
2037 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2038 let (tunnel, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2039 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2040 let circ = tunnel.as_single_circ().unwrap();
2041
2042 assert_eq!(cells_received, 301);
2043
2044 {
2046 let (tx, rx) = oneshot::channel();
2047 circ.command
2048 .unbounded_send(CtrlCmd::QuerySendWindow {
2049 hop: 2.into(),
2050 leg: tunnel.unique_id(),
2051 done: tx,
2052 })
2053 .unwrap();
2054 let (window, tags) = rx.await.unwrap().unwrap();
2055 assert_eq!(window, 1000 - 301);
2056 assert_eq!(tags.len(), 3);
2057 assert_eq!(
2059 tags[0],
2060 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2061 );
2062 assert_eq!(
2064 tags[1],
2065 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2066 );
2067 assert_eq!(
2069 tags[2],
2070 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2071 );
2072 }
2073
2074 let reply_with_sendme_fut = async move {
2075 let c_sendme =
2077 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2078 .into();
2079 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2080
2081 let s_sendme = relaymsg::Sendme::new_empty().into();
2083 sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2084
2085 sink
2086 };
2087
2088 let _sink = reply_with_sendme_fut.await;
2089
2090 rt.advance_until_stalled().await;
2091
2092 {
2095 let (tx, rx) = oneshot::channel();
2096 circ.command
2097 .unbounded_send(CtrlCmd::QuerySendWindow {
2098 hop: 2.into(),
2099 leg: tunnel.unique_id(),
2100 done: tx,
2101 })
2102 .unwrap();
2103 let (window, _tags) = rx.await.unwrap().unwrap();
2104 assert_eq!(window, 1000 - 201);
2105 }
2106 });
2107 }
2108
2109 #[traced_test]
2110 #[test]
2111 fn invalid_circ_sendme() {
2112 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2113 let (tunnel, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2117 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2118
2119 let reply_with_sendme_fut = async move {
2120 let c_sendme =
2122 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2123 .into();
2124 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2125 sink
2126 };
2127
2128 let _sink = reply_with_sendme_fut.await;
2129
2130 rt.advance_until_stalled().await;
2132 assert!(tunnel.is_closed());
2133 });
2134 }
2135
2136 #[traced_test]
2137 #[test]
2138 fn test_busy_stream_fairness() {
2139 const N_STREAMS: usize = 3;
2141 const N_CELLS: usize = 20;
2143 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2146 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2153 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2154
2155 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2156 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2157 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2158
2159 rt.spawn({
2165 let tunnel = tunnel.clone();
2168 async move {
2169 let mut clients = VecDeque::new();
2170 struct Client {
2171 stream: DataStream,
2172 to_write: &'static [u8],
2173 }
2174 for _ in 0..N_STREAMS {
2175 clients.push_back(Client {
2176 stream: tunnel
2177 .begin_stream("www.example.com", 80, None)
2178 .await
2179 .unwrap(),
2180 to_write: &[0_u8; N_BYTES][..],
2181 });
2182 }
2183 while let Some(mut client) = clients.pop_front() {
2184 if client.to_write.is_empty() {
2185 continue;
2187 }
2188 let written = client.stream.write(client.to_write).await.unwrap();
2189 client.to_write = &client.to_write[written..];
2190 clients.push_back(client);
2191 }
2192 }
2193 })
2194 .unwrap();
2195
2196 let channel_handler_fut = async {
2197 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2198 let mut total_bytes_received = 0;
2199
2200 loop {
2201 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2202 let rmsg = match msg {
2203 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2204 RelayCellFormat::V0,
2205 r.into_relay_body(),
2206 )
2207 .unwrap(),
2208 other => panic!("Unexpected chanmsg: {other:?}"),
2209 };
2210 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2211 match rmsg.cmd() {
2212 RelayCmd::BEGIN => {
2213 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2215 assert_eq!(prev, None);
2216 let connected = relaymsg::Connected::new_with_addr(
2218 "10.0.0.1".parse().unwrap(),
2219 1234,
2220 )
2221 .into();
2222 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2223 }
2224 RelayCmd::DATA => {
2225 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2226 let nbytes = data_msg.as_ref().len();
2227 total_bytes_received += nbytes;
2228 let streamid = streamid.unwrap();
2229 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2230 *stream_bytes += nbytes;
2231 if total_bytes_received >= N_BYTES {
2232 break;
2233 }
2234 }
2235 RelayCmd::END => {
2236 continue;
2241 }
2242 other => {
2243 panic!("Unexpected command {other:?}");
2244 }
2245 }
2246 }
2247
2248 (total_bytes_received, stream_bytes_received, rx, sink)
2251 };
2252
2253 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2254 channel_handler_fut.await;
2255 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2256 for (sid, stream_bytes) in stream_bytes_received {
2257 assert!(
2258 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2259 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2260 );
2261 }
2262 });
2263 }
2264
2265 #[test]
2266 fn basic_params() {
2267 use super::CircParameters;
2268 let mut p = CircParameters::default();
2269 assert!(p.extend_by_ed25519_id);
2270
2271 p.extend_by_ed25519_id = false;
2272 assert!(!p.extend_by_ed25519_id);
2273 }
2274
2275 #[cfg(feature = "hs-service")]
2276 struct AllowAllStreamsFilter;
2277 #[cfg(feature = "hs-service")]
2278 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2279 fn disposition(
2280 &mut self,
2281 _ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
2282 _circ: &crate::circuit::CircHopSyncView<'_>,
2283 ) -> Result<crate::client::stream::IncomingStreamRequestDisposition> {
2284 Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
2285 }
2286 }
2287
2288 #[traced_test]
2289 #[test]
2290 #[cfg(feature = "hs-service")]
2291 fn allow_stream_requests_twice() {
2292 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2293 let (chan, _rx, _sink) = working_fake_channel(&rt);
2294 let (tunnel, _send) = newtunnel(&rt, chan).await;
2295
2296 let _incoming = tunnel
2297 .allow_stream_requests(
2298 &[tor_cell::relaycell::RelayCmd::BEGIN],
2299 tunnel.resolve_last_hop().await,
2300 AllowAllStreamsFilter,
2301 )
2302 .await
2303 .unwrap();
2304
2305 let incoming = tunnel
2306 .allow_stream_requests(
2307 &[tor_cell::relaycell::RelayCmd::BEGIN],
2308 tunnel.resolve_last_hop().await,
2309 AllowAllStreamsFilter,
2310 )
2311 .await;
2312
2313 assert!(incoming.is_err());
2315 });
2316 }
2317
2318 #[traced_test]
2319 #[test]
2320 #[cfg(feature = "hs-service")]
2321 fn allow_stream_requests() {
2322 use tor_cell::relaycell::msg::BeginFlags;
2323
2324 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2325 const TEST_DATA: &[u8] = b"ping";
2326
2327 let (chan, _rx, _sink) = working_fake_channel(&rt);
2328 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2329
2330 let rfmt = RelayCellFormat::V0;
2331
2332 let (tx, rx) = oneshot::channel();
2334 let mut incoming = tunnel
2335 .allow_stream_requests(
2336 &[tor_cell::relaycell::RelayCmd::BEGIN],
2337 tunnel.resolve_last_hop().await,
2338 AllowAllStreamsFilter,
2339 )
2340 .await
2341 .unwrap();
2342
2343 let simulate_service = async move {
2344 let stream = incoming.next().await.unwrap();
2345 let mut data_stream = stream
2346 .accept_data(relaymsg::Connected::new_empty())
2347 .await
2348 .unwrap();
2349 tx.send(()).unwrap();
2351
2352 let mut buf = [0_u8; TEST_DATA.len()];
2354 data_stream.read_exact(&mut buf).await.unwrap();
2355 assert_eq!(&buf, TEST_DATA);
2356
2357 tunnel
2358 };
2359
2360 let simulate_client = async move {
2361 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2362 let body: BoxedCellBody =
2363 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2364 .encode(rfmt, &mut testing_rng())
2365 .unwrap();
2366 let begin_msg = chanmsg::Relay::from(body);
2367
2368 send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2370
2371 rx.await.unwrap();
2377 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2379 let body: BoxedCellBody =
2380 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2381 .encode(rfmt, &mut testing_rng())
2382 .unwrap();
2383 let data_msg = chanmsg::Relay::from(body);
2384
2385 send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2386 send
2387 };
2388
2389 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2390 });
2391 }
2392
2393 #[traced_test]
2394 #[test]
2395 #[cfg(feature = "hs-service")]
2396 fn accept_stream_after_reject() {
2397 use tor_cell::relaycell::msg::AnyRelayMsg;
2398 use tor_cell::relaycell::msg::BeginFlags;
2399 use tor_cell::relaycell::msg::EndReason;
2400
2401 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2402 const TEST_DATA: &[u8] = b"ping";
2403 const STREAM_COUNT: usize = 2;
2404 let rfmt = RelayCellFormat::V0;
2405
2406 let (chan, _rx, _sink) = working_fake_channel(&rt);
2407 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2408
2409 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2411
2412 let mut incoming = tunnel
2413 .allow_stream_requests(
2414 &[tor_cell::relaycell::RelayCmd::BEGIN],
2415 tunnel.resolve_last_hop().await,
2416 AllowAllStreamsFilter,
2417 )
2418 .await
2419 .unwrap();
2420
2421 let simulate_service = async move {
2422 for i in 0..STREAM_COUNT {
2424 let stream = incoming.next().await.unwrap();
2425
2426 if i == 0 {
2428 stream
2429 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2430 .await
2431 .unwrap();
2432 tx.send(()).await.unwrap();
2434 continue;
2435 }
2436
2437 let mut data_stream = stream
2438 .accept_data(relaymsg::Connected::new_empty())
2439 .await
2440 .unwrap();
2441 tx.send(()).await.unwrap();
2443
2444 let mut buf = [0_u8; TEST_DATA.len()];
2446 data_stream.read_exact(&mut buf).await.unwrap();
2447 assert_eq!(&buf, TEST_DATA);
2448 }
2449
2450 tunnel
2451 };
2452
2453 let simulate_client = async move {
2454 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2455 let body: BoxedCellBody =
2456 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2457 .encode(rfmt, &mut testing_rng())
2458 .unwrap();
2459 let begin_msg = chanmsg::Relay::from(body);
2460
2461 for _ in 0..STREAM_COUNT {
2464 send.send(AnyChanMsg::Relay(begin_msg.clone()))
2465 .await
2466 .unwrap();
2467
2468 rx.next().await.unwrap();
2470 }
2471
2472 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2474 let body: BoxedCellBody =
2475 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2476 .encode(rfmt, &mut testing_rng())
2477 .unwrap();
2478 let data_msg = chanmsg::Relay::from(body);
2479
2480 send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2481 send
2482 };
2483
2484 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2485 });
2486 }
2487
2488 #[traced_test]
2489 #[test]
2490 #[cfg(feature = "hs-service")]
2491 fn incoming_stream_bad_hop() {
2492 use tor_cell::relaycell::msg::BeginFlags;
2493
2494 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2495 const EXPECTED_HOP: u8 = 1;
2497 let rfmt = RelayCellFormat::V0;
2498
2499 let (chan, _rx, _sink) = working_fake_channel(&rt);
2500 let (tunnel, mut send) = newtunnel(&rt, chan).await;
2501
2502 let mut incoming = tunnel
2504 .allow_stream_requests(
2505 &[tor_cell::relaycell::RelayCmd::BEGIN],
2506 (
2508 tunnel.as_single_circ().unwrap().unique_id(),
2509 EXPECTED_HOP.into(),
2510 )
2511 .into(),
2512 AllowAllStreamsFilter,
2513 )
2514 .await
2515 .unwrap();
2516
2517 let simulate_service = async move {
2518 assert!(incoming.next().await.is_none());
2521 tunnel
2522 };
2523
2524 let simulate_client = async move {
2525 let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2526 let body: BoxedCellBody =
2527 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2528 .encode(rfmt, &mut testing_rng())
2529 .unwrap();
2530 let begin_msg = chanmsg::Relay::from(body);
2531
2532 send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2534
2535 send
2536 };
2537
2538 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2539 });
2540 }
2541
2542 #[traced_test]
2543 #[test]
2544 #[cfg(feature = "conflux")]
2545 fn multipath_circ_validation() {
2546 use std::error::Error as _;
2547
2548 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2549 let params = CircParameters::default();
2550 let invalid_tunnels = [
2551 setup_bad_conflux_tunnel(&rt).await,
2552 setup_conflux_tunnel(&rt, true, params).await,
2553 ];
2554
2555 for tunnel in invalid_tunnels {
2556 let TestTunnelCtx {
2557 tunnel: _tunnel,
2558 circs: _circs,
2559 conflux_link_rx,
2560 } = tunnel;
2561
2562 let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2563 let err_src = conflux_hs_err.source().unwrap();
2564
2565 assert!(
2568 err_src
2569 .to_string()
2570 .contains("one more more conflux circuits are invalid")
2571 );
2572 }
2573 });
2574 }
2575
2576 #[derive(Debug)]
2580 #[allow(unused)]
2581 #[cfg(feature = "conflux")]
2582 struct TestCircuitCtx {
2583 chan_rx: Receiver<AnyChanCell>,
2584 chan_tx: Sender<std::result::Result<AnyChanCell, Error>>,
2585 circ_tx: CircuitRxSender,
2586 unique_id: UniqId,
2587 }
2588
2589 #[derive(Debug)]
2590 #[cfg(feature = "conflux")]
2591 struct TestTunnelCtx {
2592 tunnel: Arc<ClientTunnel>,
2593 circs: Vec<TestCircuitCtx>,
2594 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2595 }
2596
2597 #[cfg(feature = "conflux")]
2599 async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2600 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2602 let rmsg = match chmsg {
2603 AnyChanMsg::Relay(r) => {
2604 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2605 .unwrap()
2606 }
2607 other => panic!("{:?}", other),
2608 };
2609 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2610
2611 let link = match rmsg {
2612 AnyRelayMsg::ConfluxLink(link) => link,
2613 _ => panic!("unexpected relay message {rmsg:?}"),
2614 };
2615
2616 assert!(streamid.is_none());
2617
2618 link
2619 }
2620
2621 #[cfg(feature = "conflux")]
2622 async fn setup_conflux_tunnel(
2623 rt: &MockRuntime,
2624 same_hops: bool,
2625 params: CircParameters,
2626 ) -> TestTunnelCtx {
2627 let hops1 = hop_details(3, 0);
2628 let hops2 = if same_hops {
2629 hops1.clone()
2630 } else {
2631 hop_details(3, 10)
2632 };
2633
2634 let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2635 let (mut tunnel1, sink1) = newtunnel_ext(
2636 rt,
2637 UniqId::new(1, 3),
2638 chan1,
2639 hops1,
2640 2.into(),
2641 params.clone(),
2642 )
2643 .await;
2644
2645 let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2646
2647 let (tunnel2, sink2) =
2648 newtunnel_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2649
2650 let (answer_tx, answer_rx) = oneshot::channel();
2651 tunnel2
2652 .as_single_circ()
2653 .unwrap()
2654 .command
2655 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2656 .unwrap();
2657
2658 let circuit = answer_rx.await.unwrap().unwrap();
2659 rt.advance_until_stalled().await;
2661 assert!(tunnel2.is_closed());
2662
2663 let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2664 tunnel1
2666 .as_single_circ()
2667 .unwrap()
2668 .control
2669 .unbounded_send(CtrlMsg::LinkCircuits {
2670 circuits: vec![circuit],
2671 answer: conflux_link_tx,
2672 })
2673 .unwrap();
2674
2675 let circ_ctx1 = TestCircuitCtx {
2676 chan_rx: rx1,
2677 chan_tx: chan_sink1,
2678 circ_tx: sink1,
2679 unique_id: tunnel1.unique_id(),
2680 };
2681
2682 let circ_ctx2 = TestCircuitCtx {
2683 chan_rx: rx2,
2684 chan_tx: chan_sink2,
2685 circ_tx: sink2,
2686 unique_id: tunnel2.unique_id(),
2687 };
2688
2689 tunnel1.circ.is_multi_path = true;
2695 TestTunnelCtx {
2696 tunnel: Arc::new(tunnel1),
2697 circs: vec![circ_ctx1, circ_ctx2],
2698 conflux_link_rx,
2699 }
2700 }
2701
2702 #[cfg(feature = "conflux")]
2703 async fn setup_good_conflux_tunnel(
2704 rt: &MockRuntime,
2705 cc_params: CongestionControlParams,
2706 ) -> TestTunnelCtx {
2707 let same_hops = true;
2713 let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2714 let params = CircParameters::new(true, cc_params, flow_ctrl_params);
2715 setup_conflux_tunnel(rt, same_hops, params).await
2716 }
2717
2718 #[cfg(feature = "conflux")]
2719 async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2720 let same_hops = false;
2724 let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2725 let params = CircParameters::new(true, build_cc_vegas_params(), flow_ctrl_params);
2726 setup_conflux_tunnel(rt, same_hops, params).await
2727 }
2728
2729 #[traced_test]
2730 #[test]
2731 #[cfg(feature = "conflux")]
2732 fn reject_conflux_linked_before_hs() {
2733 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2734 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2735 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2736
2737 let nonce = V1Nonce::new(&mut testing_rng());
2738 let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2739 let linked = relaymsg::ConfluxLinked::new(payload).into();
2741 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
2742
2743 rt.advance_until_stalled().await;
2744 assert!(tunnel.is_closed());
2745 });
2746 }
2747
2748 #[traced_test]
2749 #[test]
2750 #[cfg(feature = "conflux")]
2751 fn conflux_hs_timeout() {
2752 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2753 let TestTunnelCtx {
2754 tunnel: _tunnel,
2755 circs,
2756 conflux_link_rx,
2757 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2758
2759 let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2760
2761 let link = await_link_payload(&mut circ1.chan_rx).await;
2763
2764 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2766 circ1
2767 .circ_tx
2768 .send(rmsg_to_ccmsg(None, linked))
2769 .await
2770 .unwrap();
2771
2772 rt.advance_by(Duration::from_secs(60)).await;
2774
2775 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2776
2777 let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
2779 conflux_hs_res.try_into().unwrap();
2780
2781 assert!(res1.is_ok());
2782
2783 let err = res2.unwrap_err();
2784 assert_matches!(err, ConfluxHandshakeError::Timeout);
2785 });
2786 }
2787
2788 #[traced_test]
2789 #[test]
2790 #[cfg(feature = "conflux")]
2791 fn conflux_bad_hs() {
2792 use crate::util::err::ConfluxHandshakeError;
2793
2794 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2795 let nonce = V1Nonce::new(&mut testing_rng());
2796 let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2797 let bad_hs_responses = [
2799 (
2800 rmsg_to_ccmsg(
2801 None,
2802 relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
2803 ),
2804 "Received CONFLUX_LINKED cell with mismatched nonce",
2805 ),
2806 (
2807 rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
2808 "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
2809 ),
2810 (
2811 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
2812 "Received CONFLUX_SWITCH on unlinked circuit?!",
2813 ),
2814 ];
2823
2824 for (bad_cell, expected_err) in bad_hs_responses {
2825 let TestTunnelCtx {
2826 tunnel,
2827 circs,
2828 conflux_link_rx,
2829 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2830
2831 let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2832
2833 circ2.circ_tx.send(bad_cell).await.unwrap();
2835
2836 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2837 let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
2841 conflux_hs_res.try_into().unwrap();
2842
2843 match res2.unwrap_err() {
2844 ConfluxHandshakeError::Link(Error::CircProto(e)) => {
2845 assert_eq!(e, expected_err);
2846 }
2847 e => panic!("unexpected error: {e:?}"),
2848 }
2849
2850 assert!(tunnel.is_closed());
2851 }
2852 });
2853 }
2854
2855 #[traced_test]
2856 #[test]
2857 #[cfg(feature = "conflux")]
2858 fn unexpected_conflux_cell() {
2859 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2860 let nonce = V1Nonce::new(&mut testing_rng());
2861 let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2862 let bad_cells = [
2863 rmsg_to_ccmsg(
2864 None,
2865 relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
2866 ),
2867 rmsg_to_ccmsg(
2868 None,
2869 relaymsg::ConfluxLink::new(link_payload.clone()).into(),
2870 ),
2871 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
2872 ];
2873
2874 for bad_cell in bad_cells {
2875 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2876 let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2877
2878 sink.send(bad_cell).await.unwrap();
2879 rt.advance_until_stalled().await;
2880
2881 assert!(tunnel.is_closed());
2885 }
2886 });
2887 }
2888
2889 #[traced_test]
2890 #[test]
2891 #[cfg(feature = "conflux")]
2892 fn conflux_bad_linked() {
2893 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2894 let TestTunnelCtx {
2895 tunnel,
2896 circs,
2897 conflux_link_rx: _,
2898 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2899
2900 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2901
2902 let link = await_link_payload(&mut circ1.chan_rx).await;
2903
2904 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2906 circ1
2907 .circ_tx
2908 .send(rmsg_to_ccmsg(None, linked))
2909 .await
2910 .unwrap();
2911
2912 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2914 circ2
2915 .circ_tx
2916 .send(rmsg_to_ccmsg(None, linked))
2917 .await
2918 .unwrap();
2919 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2920 circ2
2921 .circ_tx
2922 .send(rmsg_to_ccmsg(None, linked))
2923 .await
2924 .unwrap();
2925
2926 rt.advance_until_stalled().await;
2927
2928 assert!(tunnel.is_closed());
2931 });
2932 }
2933
2934 #[traced_test]
2935 #[test]
2936 #[cfg(feature = "conflux")]
2937 fn conflux_bad_switch() {
2938 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2939 let cc_vegas_params = build_cc_vegas_params();
2940 let cwnd_init = cc_vegas_params.cwnd_params().cwnd_init();
2941 let bad_switch = [
2942 relaymsg::ConfluxSwitch::new(0),
2944 relaymsg::ConfluxSwitch::new(cwnd_init + 1),
2947 ];
2948
2949 for bad_cell in bad_switch {
2950 let TestTunnelCtx {
2951 tunnel,
2952 circs,
2953 conflux_link_rx,
2954 } = setup_good_conflux_tunnel(&rt, cc_vegas_params.clone()).await;
2955
2956 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2957
2958 let link = await_link_payload(&mut circ1.chan_rx).await;
2959
2960 for circ in [&mut circ1, &mut circ2] {
2962 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2963 circ.circ_tx
2964 .send(rmsg_to_ccmsg(None, linked))
2965 .await
2966 .unwrap();
2967 }
2968
2969 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2970 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
2971
2972 let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
2975 circ1.circ_tx.send(msg).await.unwrap();
2976
2977 rt.advance_until_stalled().await;
2979 assert!(tunnel.is_closed());
2980 }
2981 });
2982 }
2983
2984 #[traced_test]
2985 #[test]
2986 #[cfg(feature = "conflux")]
2987 fn conflux_consecutive_switch() {
2988 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2989 let TestTunnelCtx {
2990 tunnel,
2991 circs,
2992 conflux_link_rx,
2993 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2994
2995 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2996
2997 let link = await_link_payload(&mut circ1.chan_rx).await;
2998
2999 for circ in [&mut circ1, &mut circ2] {
3001 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3002 circ.circ_tx
3003 .send(rmsg_to_ccmsg(None, linked))
3004 .await
3005 .unwrap();
3006 }
3007
3008 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3009 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3010
3011 let switch1 = relaymsg::ConfluxSwitch::new(10);
3013 let msg = rmsg_to_ccmsg(None, switch1.into());
3014 circ1.circ_tx.send(msg).await.unwrap();
3015
3016 rt.advance_until_stalled().await;
3018 assert!(!tunnel.is_closed());
3019
3020 let switch2 = relaymsg::ConfluxSwitch::new(12);
3022 let msg = rmsg_to_ccmsg(None, switch2.into());
3023 circ1.circ_tx.send(msg).await.unwrap();
3024
3025 rt.advance_until_stalled().await;
3028 assert!(tunnel.is_closed());
3029 });
3030 }
3031
3032 #[traced_test]
3035 #[test]
3036 #[cfg(feature = "conflux")]
3037 fn shutdown_and_return_circ_multipath() {
3038 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3039 let TestTunnelCtx {
3040 tunnel,
3041 circs,
3042 conflux_link_rx: _,
3043 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3044
3045 rt.progress_until_stalled().await;
3046
3047 let (answer_tx, answer_rx) = oneshot::channel();
3048 tunnel
3049 .circ
3050 .command
3051 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3052 .unwrap();
3053
3054 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
3056 let err = answer_rx
3057 .await
3058 .unwrap()
3059 .map(|_| {
3060 ()
3063 })
3064 .unwrap_err();
3065
3066 const MSG: &str = "not a single leg conflux set (got at least 2 elements when exactly one was expected)";
3067 assert!(err.to_string().contains(MSG), "{err}");
3068
3069 rt.progress_until_stalled().await;
3072 assert!(tunnel.is_closed());
3073
3074 drop(circs);
3077 });
3078 }
3079
3080 #[cfg(feature = "conflux")]
3082 #[derive(Debug)]
3083 enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3084 Relay(ConfluxExitState<I>),
3086 Client {
3088 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3090 tunnel: Arc<ClientTunnel>,
3092 send_data: Vec<u8>,
3094 recv_data: Vec<u8>,
3096 },
3097 }
3098
3099 #[allow(unused, clippy::large_enum_variant)]
3102 #[derive(Debug)]
3103 #[cfg(feature = "conflux")]
3104 enum ConfluxEndpointResult {
3105 Circuit {
3106 tunnel: Arc<ClientTunnel>,
3107 stream: DataStream,
3108 },
3109 Relay {
3110 circ: TestCircuitCtx,
3111 },
3112 }
3113
3114 #[derive(Debug)]
3116 #[cfg(feature = "conflux")]
3117 struct ConfluxStreamState {
3118 data_recvd: Vec<u8>,
3120 expected_data_len: usize,
3122 begin_recvd: bool,
3124 end_recvd: bool,
3126 end_sent: bool,
3128 }
3129
3130 #[cfg(feature = "conflux")]
3131 impl ConfluxStreamState {
3132 fn new(expected_data_len: usize) -> Self {
3133 Self {
3134 data_recvd: vec![],
3135 expected_data_len,
3136 begin_recvd: false,
3137 end_recvd: false,
3138 end_sent: false,
3139 }
3140 }
3141 }
3142
3143 #[derive(Debug)]
3146 #[cfg(feature = "conflux")]
3147 struct ExpectedSwitch {
3148 cells_so_far: usize,
3151 seqno: u32,
3153 }
3154
3155 #[cfg(feature = "conflux")]
3161 struct CellDispatcher {
3162 leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3164 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3166 }
3167
3168 #[cfg(feature = "conflux")]
3169 impl CellDispatcher {
3170 async fn run(mut self) {
3171 while !self.cells_to_send.is_empty() {
3172 let (circ_id, cell) = self.cells_to_send.remove(0);
3173 let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3174 let (done_tx, done_rx) = oneshot::channel();
3175 cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3176 let () = done_rx.await.unwrap();
3178 }
3179 }
3180 }
3181
3182 #[cfg(feature = "conflux")]
3184 #[derive(Debug)]
3185 struct CellToSend {
3186 done_tx: oneshot::Sender<()>,
3188 cell: AnyRelayMsg,
3190 }
3191
3192 #[derive(Debug)]
3194 #[cfg(feature = "conflux")]
3195 struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3196 runtime: Arc<AsyncMutex<MockRuntime>>,
3203 tunnel: Arc<ClientTunnel>,
3205 circ: TestCircuitCtx,
3207 rtt_delays: I,
3211 stream_state: Arc<Mutex<ConfluxStreamState>>,
3214 expect_switch: Vec<ExpectedSwitch>,
3217 event_rx: mpsc::Receiver<MockExitEvent>,
3219 event_tx: mpsc::Sender<MockExitEvent>,
3221 is_sending_leg: bool,
3223 cells_rx: mpsc::Receiver<CellToSend>,
3225 }
3226
3227 #[cfg(feature = "conflux")]
3228 async fn good_exit_handshake(
3229 runtime: &Arc<AsyncMutex<MockRuntime>>,
3230 init_rtt_delay: Option<Duration>,
3231 rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3232 sink: &mut CircuitRxSender,
3233 ) {
3234 let link = await_link_payload(rx).await;
3236
3237 if let Some(init_rtt_delay) = init_rtt_delay {
3240 runtime.lock().await.advance_by(init_rtt_delay).await;
3241 }
3242
3243 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3245 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3246
3247 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3249 let rmsg = match chmsg {
3250 AnyChanMsg::Relay(r) => {
3251 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3252 .unwrap()
3253 }
3254 other => panic!("{other:?}"),
3255 };
3256 let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3257
3258 assert_matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_));
3259 }
3260
3261 #[derive(Copy, Clone, Debug)]
3263 enum MockExitEvent {
3264 Done,
3266 BeginRecvd(StreamId),
3268 }
3269
3270 #[cfg(feature = "conflux")]
3271 async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3272 state: ConfluxExitState<I>,
3273 ) -> ConfluxEndpointResult {
3274 let ConfluxExitState {
3275 runtime,
3276 tunnel,
3277 mut circ,
3278 rtt_delays,
3279 stream_state,
3280 mut expect_switch,
3281 mut event_tx,
3282 mut event_rx,
3283 is_sending_leg,
3284 mut cells_rx,
3285 } = state;
3286
3287 let mut rtt_delays = rtt_delays.into_iter();
3288
3289 let stream_len = stream_state.lock().unwrap().expected_data_len;
3291 let mut data_cells_received = 0_usize;
3292 let mut cell_count = 0_usize;
3293 let mut tags = vec![];
3294 let mut streamid = None;
3295 let mut done_writing = false;
3296
3297 loop {
3298 let should_exit = {
3299 let stream_state = stream_state.lock().unwrap();
3300 let done_reading = stream_state.data_recvd.len() >= stream_len;
3301
3302 (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3303 };
3304
3305 if should_exit {
3306 break;
3307 }
3308
3309 use futures::select;
3310
3311 let mut next_cell = if streamid.is_some() && !done_writing {
3314 Box::pin(cells_rx.next().fuse())
3315 as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3316 } else {
3317 Box::pin(std::future::pending().fuse())
3318 };
3319
3320 let res = select! {
3323 res = circ.chan_rx.next() => {
3324 res.unwrap()
3325 },
3326 res = event_rx.next() => {
3327 let Some(event) = res else {
3328 break;
3329 };
3330
3331 match event {
3332 MockExitEvent::Done => {
3333 break;
3334 },
3335 MockExitEvent::BeginRecvd(id) => {
3336 streamid = Some(id);
3339 continue;
3340 },
3341 }
3342 }
3343 res = next_cell => {
3344 if let Some(cell_to_send) = res {
3345 let CellToSend { cell, done_tx } = cell_to_send;
3346
3347 let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3349 None
3350 } else {
3351 streamid
3352 };
3353
3354 circ.circ_tx
3355 .send(rmsg_to_ccmsg(streamid, cell))
3356 .await
3357 .unwrap();
3358
3359 runtime.lock().await.advance_until_stalled().await;
3360 done_tx.send(()).unwrap();
3361 } else {
3362 done_writing = true;
3363 }
3364
3365 continue;
3366 }
3367 };
3368
3369 let (_id, chmsg) = res.into_circid_and_msg();
3370 cell_count += 1;
3371 let rmsg = match chmsg {
3372 AnyChanMsg::Relay(r) => {
3373 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3374 .unwrap()
3375 }
3376 other => panic!("{:?}", other),
3377 };
3378 let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3379 if streamid.is_none() {
3380 streamid = new_streamid;
3381 }
3382
3383 let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3384 let end_recvd = stream_state.lock().unwrap().end_recvd;
3385 match rmsg {
3386 AnyRelayMsg::Begin(_) if begin_recvd => {
3387 panic!("client tried to open two streams?!");
3388 }
3389 AnyRelayMsg::Begin(_) if !begin_recvd => {
3390 stream_state.lock().unwrap().begin_recvd = true;
3391 let connected = relaymsg::Connected::new_empty().into();
3393 circ.circ_tx
3394 .send(rmsg_to_ccmsg(streamid, connected))
3395 .await
3396 .unwrap();
3397 event_tx
3399 .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3400 .await
3401 .unwrap();
3402 }
3403 AnyRelayMsg::End(_) if !end_recvd => {
3404 stream_state.lock().unwrap().end_recvd = true;
3405 break;
3406 }
3407 AnyRelayMsg::End(_) if end_recvd => {
3408 panic!("received two END cells for the same stream?!");
3409 }
3410 AnyRelayMsg::ConfluxSwitch(cell) => {
3411 let expected = expect_switch.remove(0);
3413
3414 assert_eq!(expected.cells_so_far, cell_count);
3415 assert_eq!(expected.seqno, cell.seqno());
3416
3417 continue;
3423 }
3424 AnyRelayMsg::Data(dat) => {
3425 data_cells_received += 1;
3426 stream_state
3427 .lock()
3428 .unwrap()
3429 .data_recvd
3430 .extend_from_slice(dat.as_ref());
3431
3432 let is_next_cell_sendme = data_cells_received.is_multiple_of(31);
3433 if is_next_cell_sendme {
3434 if tags.is_empty() {
3435 runtime.lock().await.advance_until_stalled().await;
3440 let (tx, rx) = oneshot::channel();
3441 tunnel
3442 .circ
3443 .command
3444 .unbounded_send(CtrlCmd::QuerySendWindow {
3445 hop: 2.into(),
3446 leg: circ.unique_id,
3447 done: tx,
3448 })
3449 .unwrap();
3450
3451 let (_window, new_tags) = rx.await.unwrap().unwrap();
3453 tags = new_tags;
3454 }
3455
3456 let tag = tags.remove(0);
3457
3458 if let Some(rtt_delay) = rtt_delays.next().flatten() {
3461 runtime.lock().await.advance_by(rtt_delay).await;
3462 }
3463 let sendme = relaymsg::Sendme::from(tag).into();
3465
3466 circ.circ_tx
3467 .send(rmsg_to_ccmsg(None, sendme))
3468 .await
3469 .unwrap();
3470 }
3471 }
3472 _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3473 }
3474 }
3475
3476 let end_recvd = stream_state.lock().unwrap().end_recvd;
3477
3478 if is_sending_leg && !end_recvd {
3480 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3481 circ.circ_tx
3482 .send(rmsg_to_ccmsg(streamid, end))
3483 .await
3484 .unwrap();
3485 stream_state.lock().unwrap().end_sent = true;
3486 }
3487
3488 let _ = event_tx.send(MockExitEvent::Done).await;
3490
3491 assert!(
3493 expect_switch.is_empty(),
3494 "expect_switch = {expect_switch:?}"
3495 );
3496
3497 ConfluxEndpointResult::Relay { circ }
3498 }
3499
3500 #[cfg(feature = "conflux")]
3501 async fn run_conflux_client(
3502 tunnel: Arc<ClientTunnel>,
3503 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3504 send_data: Vec<u8>,
3505 recv_data: Vec<u8>,
3506 ) -> ConfluxEndpointResult {
3507 let res = conflux_link_rx.await;
3508
3509 let res = res.unwrap().unwrap();
3510 assert_eq!(res.len(), 2);
3511
3512 let mut stream = tunnel
3517 .begin_stream("www.example.com", 443, None)
3518 .await
3519 .unwrap();
3520
3521 stream.write_all(&send_data).await.unwrap();
3522 stream.flush().await.unwrap();
3523
3524 let mut recv: Vec<u8> = Vec::new();
3525 let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3526 assert_eq!(recv_len, recv_data.len());
3527 assert_eq!(recv_data, recv);
3528
3529 ConfluxEndpointResult::Circuit { tunnel, stream }
3530 }
3531
3532 #[cfg(feature = "conflux")]
3533 async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3534 endpoint: ConfluxTestEndpoint<I>,
3535 ) -> ConfluxEndpointResult {
3536 match endpoint {
3537 ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3538 ConfluxTestEndpoint::Client {
3539 tunnel,
3540 conflux_link_rx,
3541 send_data,
3542 recv_data,
3543 } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3544 }
3545 }
3546
3547 #[traced_test]
3565 #[test]
3566 #[cfg(feature = "conflux")]
3567 fn multipath_client_to_exit() {
3568 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3569 const NUM_CELLS: usize = 300;
3571 const CELL_SIZE: usize = 498;
3573
3574 let TestTunnelCtx {
3575 tunnel,
3576 circs,
3577 conflux_link_rx,
3578 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3579 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3580
3581 let mut send_data = (0..255_u8)
3583 .cycle()
3584 .take(NUM_CELLS * CELL_SIZE)
3585 .collect::<Vec<_>>();
3586 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3587
3588 let mut tasks = vec![];
3589
3590 let (tx1, rx1) = mpsc::channel(1);
3593 let (tx2, rx2) = mpsc::channel(1);
3594
3595 let circ1_rtt_delays = [
3600 Some(Duration::from_millis(100)),
3602 Some(Duration::from_millis(500)),
3606 Some(Duration::from_millis(700)),
3607 Some(Duration::from_millis(900)),
3608 Some(Duration::from_millis(1100)),
3609 Some(Duration::from_millis(1300)),
3610 Some(Duration::from_millis(1500)),
3611 Some(Duration::from_millis(1700)),
3612 Some(Duration::from_millis(1900)),
3613 Some(Duration::from_millis(2100)),
3614 ]
3615 .into_iter();
3616
3617 let circ2_rtt_delays = [
3618 Some(Duration::from_millis(200)),
3619 Some(Duration::from_millis(400)),
3620 Some(Duration::from_millis(600)),
3621 Some(Duration::from_millis(800)),
3622 Some(Duration::from_millis(1000)),
3623 Some(Duration::from_millis(1200)),
3624 Some(Duration::from_millis(1400)),
3625 Some(Duration::from_millis(1600)),
3626 Some(Duration::from_millis(1800)),
3627 Some(Duration::from_millis(2000)),
3628 ]
3629 .into_iter();
3630
3631 let expected_switches1 = vec![ExpectedSwitch {
3632 cells_so_far: 126,
3640 seqno: 124,
3649 }];
3650
3651 let expected_switches2 = vec![ExpectedSwitch {
3652 cells_so_far: 1,
3655 seqno: 125,
3657 }];
3658
3659 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3660
3661 let (_, cells_rx1) = mpsc::channel(1);
3664 let (_, cells_rx2) = mpsc::channel(1);
3665
3666 let relay1 = ConfluxExitState {
3667 runtime: Arc::clone(&relay_runtime),
3668 tunnel: Arc::clone(&tunnel),
3669 circ: circ1,
3670 rtt_delays: circ1_rtt_delays,
3671 stream_state: Arc::clone(&stream_state),
3672 expect_switch: expected_switches1,
3673 event_tx: tx1,
3674 event_rx: rx2,
3675 is_sending_leg: true,
3676 cells_rx: cells_rx1,
3677 };
3678
3679 let relay2 = ConfluxExitState {
3680 runtime: Arc::clone(&relay_runtime),
3681 tunnel: Arc::clone(&tunnel),
3682 circ: circ2,
3683 rtt_delays: circ2_rtt_delays,
3684 stream_state: Arc::clone(&stream_state),
3685 expect_switch: expected_switches2,
3686 event_tx: tx2,
3687 event_rx: rx1,
3688 is_sending_leg: false,
3689 cells_rx: cells_rx2,
3690 };
3691
3692 for mut mock_relay in [relay1, relay2] {
3693 let leg = mock_relay.circ.unique_id;
3694
3695 good_exit_handshake(
3703 &relay_runtime,
3704 mock_relay.rtt_delays.next().flatten(),
3705 &mut mock_relay.circ.chan_rx,
3706 &mut mock_relay.circ.circ_tx,
3707 )
3708 .await;
3709
3710 let relay = ConfluxTestEndpoint::Relay(mock_relay);
3711
3712 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3713 }
3714
3715 tasks.push(rt.spawn_join(
3716 "client task".to_string(),
3717 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3718 tunnel,
3719 conflux_link_rx,
3720 send_data: send_data.clone(),
3721 recv_data: vec![],
3722 }),
3723 ));
3724 let _sinks = futures::future::join_all(tasks).await;
3725 let mut stream_state = stream_state.lock().unwrap();
3726 assert!(stream_state.begin_recvd);
3727
3728 stream_state.data_recvd.sort();
3729 send_data.sort();
3730 assert_eq!(stream_state.data_recvd, send_data);
3731 });
3732 }
3733
3734 #[cfg(feature = "conflux")]
3745 async fn run_multipath_exit_to_client_test(
3746 rt: MockRuntime,
3747 tunnel: TestTunnelCtx,
3748 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3749 send_data: Vec<u8>,
3750 recv_data: Vec<u8>,
3751 ) -> Arc<Mutex<ConfluxStreamState>> {
3752 let TestTunnelCtx {
3753 tunnel,
3754 circs,
3755 conflux_link_rx,
3756 } = tunnel;
3757 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3758
3759 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3760
3761 let mut tasks = vec![];
3762 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3763 let (cells_tx1, cells_rx1) = mpsc::channel(1);
3764 let (cells_tx2, cells_rx2) = mpsc::channel(1);
3765
3766 let dispatcher = CellDispatcher {
3767 leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
3768 .into_iter()
3769 .collect(),
3770 cells_to_send,
3771 };
3772
3773 let (tx1, rx1) = mpsc::channel(1);
3776 let (tx2, rx2) = mpsc::channel(1);
3777
3778 let relay1 = ConfluxExitState {
3779 runtime: Arc::clone(&relay_runtime),
3780 tunnel: Arc::clone(&tunnel),
3781 circ: circ1,
3782 rtt_delays: [].into_iter(),
3783 stream_state: Arc::clone(&stream_state),
3784 expect_switch: vec![],
3786 event_tx: tx1,
3787 event_rx: rx2,
3788 is_sending_leg: false,
3789 cells_rx: cells_rx1,
3790 };
3791
3792 let relay2 = ConfluxExitState {
3793 runtime: Arc::clone(&relay_runtime),
3794 tunnel: Arc::clone(&tunnel),
3795 circ: circ2,
3796 rtt_delays: [].into_iter(),
3797 stream_state: Arc::clone(&stream_state),
3798 expect_switch: vec![],
3800 event_tx: tx2,
3801 event_rx: rx1,
3802 is_sending_leg: true,
3803 cells_rx: cells_rx2,
3804 };
3805
3806 rt.spawn(dispatcher.run()).unwrap();
3811
3812 for mut mock_relay in [relay1, relay2] {
3813 let leg = mock_relay.circ.unique_id;
3814
3815 good_exit_handshake(
3816 &relay_runtime,
3817 mock_relay.rtt_delays.next().flatten(),
3818 &mut mock_relay.circ.chan_rx,
3819 &mut mock_relay.circ.circ_tx,
3820 )
3821 .await;
3822
3823 let relay = ConfluxTestEndpoint::Relay(mock_relay);
3824
3825 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3826 }
3827
3828 tasks.push(rt.spawn_join(
3829 "client task".to_string(),
3830 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3831 tunnel,
3832 conflux_link_rx,
3833 send_data: send_data.clone(),
3834 recv_data,
3835 }),
3836 ));
3837
3838 let _sinks = futures::future::join_all(tasks).await;
3840
3841 stream_state
3842 }
3843
3844 #[traced_test]
3845 #[test]
3846 #[cfg(feature = "conflux")]
3847 fn multipath_exit_to_client() {
3848 const TO_SEND: &[u8] =
3850 b"But something about Buster Friendly irritated John Isidore, one specific thing";
3851
3852 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3853 const CIRC1: usize = 0;
3855 const CIRC2: usize = 1;
3856
3857 let simple_switch = vec![
3881 (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
3882 (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
3883 (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
3885 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3887 (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
3890 (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
3891 (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3892 ];
3893
3894 let multiple_switches = vec![
3941 (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
3944 (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
3946 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3947 (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
3949 (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
3950 (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
3952 (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
3954 (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
3956 (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3958 (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
3960 ];
3961
3962 let tests = [simple_switch, multiple_switches];
3968
3969 for cells_to_send in tests {
3970 let tunnel = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3971 assert_eq!(tunnel.circs.len(), 2);
3972 let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
3973 let cells_to_send = cells_to_send
3974 .into_iter()
3975 .map(|(i, cell)| (circ_ids[i], cell))
3976 .collect();
3977
3978 let send_data = vec![];
3980 let stream_state = run_multipath_exit_to_client_test(
3981 rt.clone(),
3982 tunnel,
3983 cells_to_send,
3984 send_data.clone(),
3985 TO_SEND.into(),
3986 )
3987 .await;
3988 let stream_state = stream_state.lock().unwrap();
3989 assert!(stream_state.begin_recvd);
3990 assert!(stream_state.data_recvd.is_empty());
3992 }
3993 });
3994 }
3995
3996 #[traced_test]
3997 #[test]
3998 #[cfg(all(feature = "conflux", feature = "hs-service"))]
3999 fn conflux_incoming_stream() {
4000 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4001 use std::error::Error as _;
4002
4003 const EXPECTED_HOP: u8 = 1;
4004
4005 let TestTunnelCtx {
4006 tunnel,
4007 circs,
4008 conflux_link_rx,
4009 } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
4010
4011 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4012
4013 let link = await_link_payload(&mut circ1.chan_rx).await;
4014 for circ in [&mut circ1, &mut circ2] {
4015 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4016 circ.circ_tx
4017 .send(rmsg_to_ccmsg(None, linked))
4018 .await
4019 .unwrap();
4020 }
4021
4022 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4023 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4024
4025 let err = tunnel
4027 .allow_stream_requests(
4028 &[tor_cell::relaycell::RelayCmd::BEGIN],
4029 (tunnel.circ.unique_id(), EXPECTED_HOP.into()).into(),
4030 AllowAllStreamsFilter,
4031 )
4032 .await
4033 .map(|_| ())
4035 .unwrap_err();
4036
4037 let err_src = err.source().unwrap().to_string();
4038 assert!(
4039 err_src.contains("Cannot allow stream requests on a multi-path tunnel"),
4040 "{err_src}"
4041 );
4042 });
4043 }
4044
4045 #[test]
4046 fn client_circ_chan_msg() {
4047 use tor_cell::chancell::msg::{self, AnyChanMsg};
4048 fn good(m: AnyChanMsg) {
4049 assert!(ClientCircChanMsg::try_from(m).is_ok());
4050 }
4051 fn bad(m: AnyChanMsg) {
4052 assert!(ClientCircChanMsg::try_from(m).is_err());
4053 }
4054
4055 good(msg::Destroy::new(2.into()).into());
4056 bad(msg::CreatedFast::new(&b"guaranteed in this world"[..]).into());
4057 bad(msg::Created2::new(&b"and the next"[..]).into());
4058 good(msg::Relay::new(&b"guaranteed guaranteed"[..]).into());
4059 bad(msg::AnyChanMsg::RelayEarly(
4060 msg::Relay::new(&b"for the world and its mother"[..]).into(),
4061 ));
4062 bad(msg::Versions::new([1, 2, 3]).unwrap().into());
4063 }
4064}