1pub(crate) mod celltypes;
37pub(crate) mod halfcirc;
38
39#[cfg(feature = "hs-common")]
40pub mod handshake;
41#[cfg(not(feature = "hs-common"))]
42pub(crate) mod handshake;
43
44pub(super) mod path;
45pub(crate) mod unique_id;
46
47use crate::channel::Channel;
48use crate::congestion::params::CongestionControlParams;
49use crate::crypto::cell::HopNum;
50#[cfg(feature = "ntor_v3")]
51use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
52use crate::memquota::{CircuitAccount, SpecificAccount as _};
53use crate::stream::{
54 AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
55 StreamReader,
56};
57use crate::tunnel::circuit::celltypes::*;
58use crate::tunnel::reactor::CtrlCmd;
59use crate::tunnel::reactor::{
60 CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
61};
62use crate::tunnel::StreamTarget;
63use crate::util::skew::ClockSkew;
64use crate::{Error, ResolveError, Result};
65use educe::Educe;
66use tor_cell::{
67 chancell::CircId,
68 relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
69};
70
71use tor_error::{internal, into_internal};
72use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
73
74pub use crate::crypto::binding::CircuitBinding;
75pub use crate::memquota::StreamAccount;
76pub use crate::tunnel::circuit::unique_id::UniqId;
77
78#[cfg(feature = "hs-service")]
79use {
80 crate::stream::{IncomingCmdChecker, IncomingStream},
81 crate::tunnel::reactor::StreamReqInfo,
82};
83
84use futures::channel::mpsc;
85use oneshot_fused_workaround as oneshot;
86
87use crate::congestion::sendme::StreamRecvWindow;
88use crate::DynTimeProvider;
89use futures::FutureExt as _;
90use std::net::IpAddr;
91use std::sync::{Arc, Mutex};
92use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
93
94use crate::crypto::handshake::ntor::NtorPublicKey;
95
96pub use path::{Path, PathEntry};
97
98pub const CIRCUIT_BUFFER_SIZE: usize = 128;
100
101#[cfg(feature = "send-control-msg")]
102use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
103
104pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
105#[cfg(feature = "send-control-msg")]
106#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
107pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
108
109pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
111pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
113
114pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
116pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
118
119#[derive(Debug)]
120pub struct ClientCirc {
163 mutable: Arc<Mutex<MutableState>>,
165 unique_id: UniqId,
167 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
169 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
171 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
174 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
175 #[cfg(test)]
177 circid: CircId,
178 memquota: CircuitAccount,
180 time_provider: DynTimeProvider,
182}
183
184#[derive(Educe, Default)]
186#[educe(Debug)]
187pub(super) struct MutableState {
188 pub(super) path: Arc<path::Path>,
194
195 #[educe(Debug(ignore))]
203 pub(super) binding: Vec<Option<CircuitBinding>>,
204}
205
206pub struct PendingClientCirc {
211 recvcreated: oneshot::Receiver<CreateResponse>,
214 circ: Arc<ClientCirc>,
216}
217
218#[non_exhaustive]
220#[derive(Clone, Debug)]
221pub struct CircParameters {
222 pub extend_by_ed25519_id: bool,
225 pub ccontrol: CongestionControlParams,
227}
228
229#[cfg(test)]
230impl std::default::Default for CircParameters {
231 fn default() -> Self {
232 Self {
233 extend_by_ed25519_id: true,
234 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
235 }
236 }
237}
238
239impl CircParameters {
240 pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
242 Self {
243 extend_by_ed25519_id,
244 ccontrol,
245 }
246 }
247}
248
249impl ClientCirc {
250 pub fn first_hop(&self) -> OwnedChanTarget {
258 let first_hop = self
259 .mutable
260 .lock()
261 .expect("poisoned lock")
262 .path
263 .first_hop()
264 .expect("called first_hop on an un-constructed circuit");
265 match first_hop {
266 path::HopDetail::Relay(r) => r,
267 #[cfg(feature = "hs-common")]
268 path::HopDetail::Virtual => {
269 panic!("somehow made a circuit with a virtual first hop.")
270 }
271 }
272 }
273
274 pub fn last_hop_num(&self) -> Result<HopNum> {
279 Ok(self
280 .mutable
281 .lock()
282 .expect("poisoned lock")
283 .path
284 .last_hop_num()
285 .ok_or_else(|| internal!("no last hop index"))?)
286 }
287
288 #[deprecated(since = "0.11.1", note = "Use path_ref() instead.")]
297 pub fn path(&self) -> Vec<OwnedChanTarget> {
298 #[allow(clippy::unnecessary_filter_map)] self.mutable
300 .lock()
301 .expect("poisoned lock")
302 .path
303 .all_hops()
304 .into_iter()
305 .filter_map(|hop| match hop {
306 path::HopDetail::Relay(r) => Some(r),
307 #[cfg(feature = "hs-common")]
308 path::HopDetail::Virtual => None,
309 })
310 .collect()
311 }
312
313 pub fn path_ref(&self) -> Arc<Path> {
318 self.mutable.lock().expect("poisoned_lock").path.clone()
319 }
320
321 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
325 let (tx, rx) = oneshot::channel();
326
327 self.control
328 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
329 .map_err(|_| Error::CircuitClosed)?;
330
331 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
332 }
333
334 pub fn mq_account(&self) -> &CircuitAccount {
336 &self.memquota
337 }
338
339 pub fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
347 self.mutable
348 .lock()
349 .expect("poisoned lock")
350 .binding
351 .get::<usize>(hop.into())
352 .cloned()
353 .flatten()
354 }
357
358 #[cfg(feature = "send-control-msg")]
436 pub async fn start_conversation(
437 &self,
438 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
439 reply_handler: impl MsgHandler + Send + 'static,
440 hop_num: HopNum,
441 ) -> Result<Conversation<'_>> {
442 let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
443 let conversation = Conversation(self);
444 conversation.send_internal(msg, Some(handler)).await?;
445 Ok(conversation)
446 }
447
448 #[cfg(feature = "send-control-msg")]
452 #[deprecated(since = "0.13.0", note = "Use start_conversation instead.")]
453 pub async fn start_conversation_last_hop(
454 &self,
455 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
456 reply_handler: impl MsgHandler + Send + 'static,
457 ) -> Result<Conversation<'_>> {
458 let last_hop = self
459 .mutable
460 .lock()
461 .expect("poisoned lock")
462 .path
463 .last_hop_num()
464 .ok_or_else(|| internal!("no last hop index"))?;
465
466 self.start_conversation(msg, reply_handler, last_hop).await
467 }
468
469 #[cfg(feature = "send-control-msg")]
475 pub async fn send_raw_msg(
476 &self,
477 msg: tor_cell::relaycell::msg::AnyRelayMsg,
478 hop_num: HopNum,
479 ) -> Result<()> {
480 let (sender, receiver) = oneshot::channel();
481 let ctrl_msg = CtrlMsg::SendMsg {
482 hop_num,
483 msg,
484 sender,
485 };
486 self.control
487 .unbounded_send(ctrl_msg)
488 .map_err(|_| Error::CircuitClosed)?;
489
490 receiver.await.map_err(|_| Error::CircuitClosed)?
491 }
492
493 #[cfg(feature = "hs-service")]
513 pub async fn allow_stream_requests(
514 self: &Arc<ClientCirc>,
515 allow_commands: &[tor_cell::relaycell::RelayCmd],
516 hop_num: HopNum,
517 filter: impl crate::stream::IncomingStreamRequestFilter,
518 ) -> Result<impl futures::Stream<Item = IncomingStream>> {
519 use futures::stream::StreamExt;
520
521 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
523
524 let time_prov = self.time_provider.clone();
525 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
526 let (incoming_sender, incoming_receiver) =
527 MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
528 let (tx, rx) = oneshot::channel();
529
530 self.command
531 .unbounded_send(CtrlCmd::AwaitStreamRequest {
532 cmd_checker,
533 incoming_sender,
534 hop_num,
535 done: tx,
536 filter: Box::new(filter),
537 })
538 .map_err(|_| Error::CircuitClosed)?;
539
540 rx.await.map_err(|_| Error::CircuitClosed)??;
542
543 let allowed_hop_num = hop_num;
544
545 let circ = Arc::clone(self);
546 Ok(incoming_receiver.map(move |req_ctx| {
547 let StreamReqInfo {
548 req,
549 stream_id,
550 hop_num,
551 receiver,
552 msg_tx,
553 memquota,
554 } = req_ctx;
555
556 assert_eq!(allowed_hop_num, hop_num);
561
562 let target = StreamTarget {
563 circ: Arc::clone(&circ),
564 tx: msg_tx,
565 hop_num,
566 stream_id,
567 };
568
569 let reader = StreamReader {
570 target: target.clone(),
571 receiver,
572 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
573 ended: false,
574 };
575
576 IncomingStream::new(req, target, reader, memquota)
577 }))
578 }
579
580 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
583 where
584 Tg: CircTarget,
585 {
586 let key = NtorPublicKey {
587 id: *target
588 .rsa_identity()
589 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
590 pk: *target.ntor_onion_key(),
591 };
592 let mut linkspecs = target
593 .linkspecs()
594 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
595 if !params.extend_by_ed25519_id {
596 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
597 }
598
599 let (tx, rx) = oneshot::channel();
600
601 let peer_id = OwnedChanTarget::from_chan_target(target);
602 self.control
603 .unbounded_send(CtrlMsg::ExtendNtor {
604 peer_id,
605 public_key: key,
606 linkspecs,
607 params: params.clone(),
608 done: tx,
609 })
610 .map_err(|_| Error::CircuitClosed)?;
611
612 rx.await.map_err(|_| Error::CircuitClosed)??;
613
614 Ok(())
615 }
616
617 #[cfg(feature = "ntor_v3")]
620 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
621 where
622 Tg: CircTarget,
623 {
624 let key = NtorV3PublicKey {
625 id: *target
626 .ed_identity()
627 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
628 pk: *target.ntor_onion_key(),
629 };
630 let mut linkspecs = target
631 .linkspecs()
632 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
633 if !params.extend_by_ed25519_id {
634 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
635 }
636
637 let (tx, rx) = oneshot::channel();
638
639 let peer_id = OwnedChanTarget::from_chan_target(target);
640 self.control
641 .unbounded_send(CtrlMsg::ExtendNtorV3 {
642 peer_id,
643 public_key: key,
644 linkspecs,
645 params: params.clone(),
646 done: tx,
647 })
648 .map_err(|_| Error::CircuitClosed)?;
649
650 rx.await.map_err(|_| Error::CircuitClosed)??;
651
652 Ok(())
653 }
654
655 #[cfg(feature = "hs-common")]
681 pub async fn extend_virtual(
682 &self,
683 protocol: handshake::RelayProtocol,
684 role: handshake::HandshakeRole,
685 seed: impl handshake::KeyGenerator,
686 params: CircParameters,
687 ) -> Result<()> {
688 use self::handshake::BoxedClientLayer;
689
690 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
691 let relay_cell_format = protocol.relay_cell_format();
692
693 let BoxedClientLayer { fwd, back, binding } = protocol.construct_layers(role, seed)?;
694
695 let (tx, rx) = oneshot::channel();
696 let message = CtrlCmd::ExtendVirtual {
697 relay_cell_format,
698 cell_crypto: (fwd, back, binding),
699 params,
700 done: tx,
701 };
702
703 self.command
704 .unbounded_send(message)
705 .map_err(|_| Error::CircuitClosed)?;
706
707 rx.await.map_err(|_| Error::CircuitClosed)?
708 }
709
710 async fn begin_stream_impl(
718 self: &Arc<ClientCirc>,
719 begin_msg: AnyRelayMsg,
720 cmd_checker: AnyCmdChecker,
721 ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
722 let time_prov = self.time_provider.clone();
726
727 let hop_num = self
728 .mutable
729 .lock()
730 .expect("poisoned lock")
731 .path
732 .last_hop_num()
733 .ok_or_else(|| Error::from(internal!("Can't begin a stream at the 0th hop")))?;
734
735 let memquota = StreamAccount::new(self.mq_account())?;
736 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
737 .new_mq(time_prov.clone(), memquota.as_raw_account())?;
738 let (tx, rx) = oneshot::channel();
739 let (msg_tx, msg_rx) =
740 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
741
742 self.control
743 .unbounded_send(CtrlMsg::BeginStream {
744 hop_num,
745 message: begin_msg,
746 sender,
747 rx: msg_rx,
748 done: tx,
749 cmd_checker,
750 })
751 .map_err(|_| Error::CircuitClosed)?;
752
753 let stream_id = rx.await.map_err(|_| Error::CircuitClosed)??;
754
755 let target = StreamTarget {
756 circ: self.clone(),
757 tx: msg_tx,
758 hop_num,
759 stream_id,
760 };
761
762 let reader = StreamReader {
763 target: target.clone(),
764 receiver,
765 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
766 ended: false,
767 };
768
769 Ok((reader, target, memquota))
770 }
771
772 async fn begin_data_stream(
775 self: &Arc<ClientCirc>,
776 msg: AnyRelayMsg,
777 optimistic: bool,
778 ) -> Result<DataStream> {
779 let (reader, target, memquota) = self
780 .begin_stream_impl(msg, DataCmdChecker::new_any())
781 .await?;
782 let mut stream = DataStream::new(reader, target, memquota);
783 if !optimistic {
784 stream.wait_for_connection().await?;
785 }
786 Ok(stream)
787 }
788
789 pub async fn begin_stream(
795 self: &Arc<ClientCirc>,
796 target: &str,
797 port: u16,
798 parameters: Option<StreamParameters>,
799 ) -> Result<DataStream> {
800 let parameters = parameters.unwrap_or_default();
801 let begin_flags = parameters.begin_flags();
802 let optimistic = parameters.is_optimistic();
803 let target = if parameters.suppressing_hostname() {
804 ""
805 } else {
806 target
807 };
808 let beginmsg = Begin::new(target, port, begin_flags)
809 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
810 self.begin_data_stream(beginmsg.into(), optimistic).await
811 }
812
813 pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
816 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
821 .await
822 }
823
824 pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
830 let resolve_msg = Resolve::new(hostname);
831
832 let resolved_msg = self.try_resolve(resolve_msg).await?;
833
834 resolved_msg
835 .into_answers()
836 .into_iter()
837 .filter_map(|(val, _)| match resolvedval_to_result(val) {
838 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
839 Ok(_) => None,
840 Err(e) => Some(Err(e)),
841 })
842 .collect()
843 }
844
845 pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
851 let resolve_ptr_msg = Resolve::new_reverse(&addr);
852
853 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
854
855 resolved_msg
856 .into_answers()
857 .into_iter()
858 .filter_map(|(val, _)| match resolvedval_to_result(val) {
859 Ok(ResolvedVal::Hostname(v)) => Some(
860 String::from_utf8(v)
861 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
862 ),
863 Ok(_) => None,
864 Err(e) => Some(Err(e)),
865 })
866 .collect()
867 }
868
869 async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
872 let (reader, _target, memquota) = self
873 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
874 .await?;
875 let mut resolve_stream = ResolveStream::new(reader, memquota);
876 resolve_stream.read_msg().await
877 }
878
879 pub fn terminate(&self) {
890 let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
891 }
892
893 pub(crate) fn protocol_error(&self) {
901 self.terminate();
902 }
903
904 pub fn is_closing(&self) -> bool {
906 self.control.is_closed()
907 }
908
909 pub fn unique_id(&self) -> UniqId {
911 self.unique_id
912 }
913
914 pub fn n_hops(&self) -> usize {
921 self.mutable.lock().expect("poisoned lock").path.n_hops()
922 }
923
924 #[cfg(feature = "experimental-api")]
931 pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
932 self.reactor_closed_rx.clone().map(|_| ())
933 }
934}
935
936#[cfg(feature = "send-control-msg")]
944#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
945pub struct Conversation<'r>(&'r ClientCirc);
946
947#[cfg(feature = "send-control-msg")]
948#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
949impl Conversation<'_> {
950 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
955 self.send_internal(Some(msg), None).await
956 }
957
958 pub(crate) async fn send_internal(
962 &self,
963 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
964 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
965 ) -> Result<()> {
966 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
967 let (sender, receiver) = oneshot::channel();
968
969 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
970 msg,
971 handler,
972 sender,
973 };
974 self.0
975 .control
976 .unbounded_send(ctrl_msg)
977 .map_err(|_| Error::CircuitClosed)?;
978
979 receiver.await.map_err(|_| Error::CircuitClosed)?
980 }
981}
982
983impl PendingClientCirc {
984 pub(crate) fn new(
990 id: CircId,
991 channel: Arc<Channel>,
992 createdreceiver: oneshot::Receiver<CreateResponse>,
993 input: CircuitRxReceiver,
994 unique_id: UniqId,
995 memquota: CircuitAccount,
996 ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
997 let time_provider = channel.time_provider().clone();
998 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
999 Reactor::new(channel, id, unique_id, input, memquota.clone());
1000
1001 let circuit = ClientCirc {
1002 mutable,
1003 unique_id,
1004 control: control_tx,
1005 command: command_tx,
1006 reactor_closed_rx: reactor_closed_rx.shared(),
1007 #[cfg(test)]
1008 circid: id,
1009 memquota,
1010 time_provider,
1011 };
1012
1013 let pending = PendingClientCirc {
1014 recvcreated: createdreceiver,
1015 circ: Arc::new(circuit),
1016 };
1017 (pending, reactor)
1018 }
1019
1020 pub fn peek_unique_id(&self) -> UniqId {
1022 self.circ.unique_id
1023 }
1024
1025 pub async fn create_firsthop_fast(self, params: &CircParameters) -> Result<Arc<ClientCirc>> {
1032 let (tx, rx) = oneshot::channel();
1033 self.circ
1034 .control
1035 .unbounded_send(CtrlMsg::Create {
1036 recv_created: self.recvcreated,
1037 handshake: CircuitHandshake::CreateFast,
1038 params: params.clone(),
1039 done: tx,
1040 })
1041 .map_err(|_| Error::CircuitClosed)?;
1042
1043 rx.await.map_err(|_| Error::CircuitClosed)??;
1044
1045 Ok(self.circ)
1046 }
1047
1048 pub async fn create_firsthop_ntor<Tg>(
1053 self,
1054 target: &Tg,
1055 params: CircParameters,
1056 ) -> Result<Arc<ClientCirc>>
1057 where
1058 Tg: tor_linkspec::CircTarget,
1059 {
1060 let (tx, rx) = oneshot::channel();
1061
1062 self.circ
1063 .control
1064 .unbounded_send(CtrlMsg::Create {
1065 recv_created: self.recvcreated,
1066 handshake: CircuitHandshake::Ntor {
1067 public_key: NtorPublicKey {
1068 id: *target
1069 .rsa_identity()
1070 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1071 pk: *target.ntor_onion_key(),
1072 },
1073 ed_identity: *target
1074 .ed_identity()
1075 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1076 },
1077 params: params.clone(),
1078 done: tx,
1079 })
1080 .map_err(|_| Error::CircuitClosed)?;
1081
1082 rx.await.map_err(|_| Error::CircuitClosed)??;
1083
1084 Ok(self.circ)
1085 }
1086
1087 #[cfg(feature = "ntor_v3")]
1096 pub async fn create_firsthop_ntor_v3<Tg>(
1097 self,
1098 target: &Tg,
1099 params: CircParameters,
1100 ) -> Result<Arc<ClientCirc>>
1101 where
1102 Tg: tor_linkspec::CircTarget,
1103 {
1104 let (tx, rx) = oneshot::channel();
1105
1106 self.circ
1107 .control
1108 .unbounded_send(CtrlMsg::Create {
1109 recv_created: self.recvcreated,
1110 handshake: CircuitHandshake::NtorV3 {
1111 public_key: NtorV3PublicKey {
1112 id: *target
1113 .ed_identity()
1114 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1115 pk: *target.ntor_onion_key(),
1116 },
1117 },
1118 params: params.clone(),
1119 done: tx,
1120 })
1121 .map_err(|_| Error::CircuitClosed)?;
1122
1123 rx.await.map_err(|_| Error::CircuitClosed)??;
1124
1125 Ok(self.circ)
1126 }
1127}
1128
1129fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1132 match val {
1133 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1134 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1135 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1136 _ => Ok(val),
1137 }
1138}
1139
1140#[cfg(test)]
1141pub(crate) mod test {
1142 #![allow(clippy::bool_assert_comparison)]
1144 #![allow(clippy::clone_on_copy)]
1145 #![allow(clippy::dbg_macro)]
1146 #![allow(clippy::mixed_attributes_style)]
1147 #![allow(clippy::print_stderr)]
1148 #![allow(clippy::print_stdout)]
1149 #![allow(clippy::single_char_pattern)]
1150 #![allow(clippy::unwrap_used)]
1151 #![allow(clippy::unchecked_duration_subtraction)]
1152 #![allow(clippy::useless_vec)]
1153 #![allow(clippy::needless_pass_by_value)]
1154 use super::*;
1157 use crate::channel::OpenChanCellS2C;
1158 use crate::channel::{test::new_reactor, CodecError};
1159 use crate::congestion::sendme;
1160 use crate::crypto::cell::RelayCellBody;
1161 #[cfg(feature = "ntor_v3")]
1162 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1163 #[cfg(feature = "hs-service")]
1164 use crate::stream::IncomingStreamRequestFilter;
1165 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1166 use futures::channel::mpsc::{Receiver, Sender};
1167 use futures::io::{AsyncReadExt, AsyncWriteExt};
1168 use futures::sink::SinkExt;
1169 use futures::stream::StreamExt;
1170 use futures::task::SpawnExt;
1171 use hex_literal::hex;
1172 use std::collections::{HashMap, VecDeque};
1173 use std::fmt::Debug;
1174 use std::time::Duration;
1175 use tor_basic_utils::test_rng::testing_rng;
1176 use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody};
1177 use tor_cell::relaycell::extend::NtorV3Extension;
1178 use tor_cell::relaycell::{
1179 msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1180 };
1181 use tor_linkspec::OwnedCircTarget;
1182 use tor_memquota::HasMemoryCost;
1183 use tor_rtcompat::Runtime;
1184 use tracing::trace;
1185 use tracing_test::traced_test;
1186
1187 impl PendingClientCirc {
1188 pub(crate) fn peek_circid(&self) -> CircId {
1190 self.circ.circid
1191 }
1192 }
1193
1194 impl ClientCirc {
1195 pub(crate) fn peek_circid(&self) -> CircId {
1197 self.circid
1198 }
1199 }
1200
1201 fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1202 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1203 .encode(&mut testing_rng())
1204 .unwrap();
1205 let chanmsg = chanmsg::Relay::from(body);
1206 ClientCircChanMsg::Relay(chanmsg)
1207 }
1208
1209 const EXAMPLE_SK: [u8; 32] =
1211 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1212 const EXAMPLE_PK: [u8; 32] =
1213 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1214 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1215 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1216
1217 #[cfg(test)]
1219 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1220 buffer: usize,
1221 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1222 crate::fake_mpsc(buffer)
1223 }
1224
1225 fn example_target() -> OwnedCircTarget {
1227 let mut builder = OwnedCircTarget::builder();
1228 builder
1229 .chan_target()
1230 .ed_identity(EXAMPLE_ED_ID.into())
1231 .rsa_identity(EXAMPLE_RSA_ID.into());
1232 builder
1233 .ntor_onion_key(EXAMPLE_PK.into())
1234 .protocols("FlowCtrl=1".parse().unwrap())
1235 .build()
1236 .unwrap()
1237 }
1238 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1239 crate::crypto::handshake::ntor::NtorSecretKey::new(
1240 EXAMPLE_SK.into(),
1241 EXAMPLE_PK.into(),
1242 EXAMPLE_RSA_ID.into(),
1243 )
1244 }
1245 #[cfg(feature = "ntor_v3")]
1246 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1247 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1248 EXAMPLE_SK.into(),
1249 EXAMPLE_PK.into(),
1250 EXAMPLE_ED_ID.into(),
1251 )
1252 }
1253
1254 fn working_fake_channel<R: Runtime>(
1255 rt: &R,
1256 ) -> (
1257 Arc<Channel>,
1258 Receiver<AnyChanCell>,
1259 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1260 ) {
1261 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1262 rt.spawn(async {
1263 let _ignore = chan_reactor.run().await;
1264 })
1265 .unwrap();
1266 (channel, rx, tx)
1267 }
1268
1269 #[derive(Copy, Clone)]
1271 enum HandshakeType {
1272 Fast,
1273 Ntor,
1274 #[cfg(feature = "ntor_v3")]
1275 NtorV3,
1276 }
1277
1278 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1279 use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1283
1284 let (chan, mut rx, _sink) = working_fake_channel(rt);
1285 let circid = CircId::new(128).unwrap();
1286 let (created_send, created_recv) = oneshot::channel();
1287 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1288 let unique_id = UniqId::new(23, 17);
1289
1290 let (pending, reactor) = PendingClientCirc::new(
1291 circid,
1292 chan,
1293 created_recv,
1294 circmsg_recv,
1295 unique_id,
1296 CircuitAccount::new_noop(),
1297 );
1298
1299 rt.spawn(async {
1300 let _ignore = reactor.run().await;
1301 })
1302 .unwrap();
1303
1304 let simulate_relay_fut = async move {
1306 let mut rng = testing_rng();
1307 let create_cell = rx.next().await.unwrap();
1308 assert_eq!(create_cell.circid(), Some(circid));
1309 let reply = match handshake_type {
1310 HandshakeType::Fast => {
1311 let cf = match create_cell.msg() {
1312 AnyChanMsg::CreateFast(cf) => cf,
1313 other => panic!("{:?}", other),
1314 };
1315 let (_, rep) = CreateFastServer::server(
1316 &mut rng,
1317 &mut |_: &()| Some(()),
1318 &[()],
1319 cf.handshake(),
1320 )
1321 .unwrap();
1322 CreateResponse::CreatedFast(CreatedFast::new(rep))
1323 }
1324 HandshakeType::Ntor => {
1325 let c2 = match create_cell.msg() {
1326 AnyChanMsg::Create2(c2) => c2,
1327 other => panic!("{:?}", other),
1328 };
1329 let (_, rep) = NtorServer::server(
1330 &mut rng,
1331 &mut |_: &()| Some(()),
1332 &[example_ntor_key()],
1333 c2.body(),
1334 )
1335 .unwrap();
1336 CreateResponse::Created2(Created2::new(rep))
1337 }
1338 #[cfg(feature = "ntor_v3")]
1339 HandshakeType::NtorV3 => {
1340 let c2 = match create_cell.msg() {
1341 AnyChanMsg::Create2(c2) => c2,
1342 other => panic!("{:?}", other),
1343 };
1344 let (_, rep) = NtorV3Server::server(
1345 &mut rng,
1346 &mut |_: &_| Some(vec![]),
1347 &[example_ntor_v3_key()],
1348 c2.body(),
1349 )
1350 .unwrap();
1351 CreateResponse::Created2(Created2::new(rep))
1352 }
1353 };
1354 created_send.send(reply).unwrap();
1355 };
1356 let client_fut = async move {
1358 let target = example_target();
1359 let params = CircParameters::default();
1360 let ret = match handshake_type {
1361 HandshakeType::Fast => {
1362 trace!("doing fast create");
1363 pending.create_firsthop_fast(¶ms).await
1364 }
1365 HandshakeType::Ntor => {
1366 trace!("doing ntor create");
1367 pending.create_firsthop_ntor(&target, params).await
1368 }
1369 #[cfg(feature = "ntor_v3")]
1370 HandshakeType::NtorV3 => {
1371 trace!("doing ntor_v3 create");
1372 pending.create_firsthop_ntor_v3(&target, params).await
1373 }
1374 };
1375 trace!("create done: result {:?}", ret);
1376 ret
1377 };
1378
1379 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1380
1381 let _circ = circ.unwrap();
1382
1383 assert_eq!(_circ.n_hops(), 1);
1385 }
1386
1387 #[traced_test]
1388 #[test]
1389 fn test_create_fast() {
1390 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1391 test_create(&rt, HandshakeType::Fast).await;
1392 });
1393 }
1394 #[traced_test]
1395 #[test]
1396 fn test_create_ntor() {
1397 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1398 test_create(&rt, HandshakeType::Ntor).await;
1399 });
1400 }
1401 #[cfg(feature = "ntor_v3")]
1402 #[traced_test]
1403 #[test]
1404 fn test_create_ntor_v3() {
1405 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1406 test_create(&rt, HandshakeType::NtorV3).await;
1407 });
1408 }
1409
1410 pub(crate) struct DummyCrypto {
1413 counter_tag: [u8; 20],
1414 counter: u32,
1415 lasthop: bool,
1416 }
1417 impl DummyCrypto {
1418 fn next_tag(&mut self) -> &[u8; 20] {
1419 #![allow(clippy::identity_op)]
1420 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1421 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1422 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1423 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1424 self.counter += 1;
1425 &self.counter_tag
1426 }
1427 }
1428
1429 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1430 fn originate_for(&mut self, _cell: &mut RelayCellBody) -> &[u8] {
1431 self.next_tag()
1432 }
1433 fn encrypt_outbound(&mut self, _cell: &mut RelayCellBody) {}
1434 }
1435 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1436 fn decrypt_inbound(&mut self, _cell: &mut RelayCellBody) -> Option<&[u8]> {
1437 if self.lasthop {
1438 Some(self.next_tag())
1439 } else {
1440 None
1441 }
1442 }
1443 }
1444 impl DummyCrypto {
1445 pub(crate) fn new(lasthop: bool) -> Self {
1446 DummyCrypto {
1447 counter_tag: [0; 20],
1448 counter: 0,
1449 lasthop,
1450 }
1451 }
1452 }
1453
1454 async fn newcirc_ext<R: Runtime>(
1457 rt: &R,
1458 chan: Arc<Channel>,
1459 next_msg_from: HopNum,
1460 ) -> (Arc<ClientCirc>, CircuitRxSender) {
1461 let circid = CircId::new(128).unwrap();
1462 let (_created_send, created_recv) = oneshot::channel();
1463 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1464 let unique_id = UniqId::new(23, 17);
1465
1466 let (pending, reactor) = PendingClientCirc::new(
1467 circid,
1468 chan,
1469 created_recv,
1470 circmsg_recv,
1471 unique_id,
1472 CircuitAccount::new_noop(),
1473 );
1474
1475 rt.spawn(async {
1476 let _ignore = reactor.run().await;
1477 })
1478 .unwrap();
1479
1480 let PendingClientCirc {
1481 circ,
1482 recvcreated: _,
1483 } = pending;
1484
1485 let relay_cell_format = RelayCellFormat::V0;
1487 for idx in 0_u8..3 {
1488 let params = CircParameters::default();
1489 let (tx, rx) = oneshot::channel();
1490 circ.command
1491 .unbounded_send(CtrlCmd::AddFakeHop {
1492 relay_cell_format,
1493 fwd_lasthop: idx == 2,
1494 rev_lasthop: idx == u8::from(next_msg_from),
1495 params,
1496 done: tx,
1497 })
1498 .unwrap();
1499 rx.await.unwrap().unwrap();
1500 }
1501
1502 (circ, circmsg_send)
1503 }
1504
1505 async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1508 newcirc_ext(rt, chan, 2.into()).await
1509 }
1510
1511 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1512 use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1513
1514 let (chan, mut rx, _sink) = working_fake_channel(rt);
1515 let (circ, mut sink) = newcirc(rt, chan).await;
1516 let circid = circ.peek_circid();
1517 let params = CircParameters::default();
1518
1519 let extend_fut = async move {
1520 let target = example_target();
1521 match handshake_type {
1522 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1523 HandshakeType::Ntor => circ.extend_ntor(&target, ¶ms).await.unwrap(),
1524 #[cfg(feature = "ntor_v3")]
1525 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, ¶ms).await.unwrap(),
1526 };
1527 circ };
1529 let reply_fut = async move {
1530 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1533 assert_eq!(id, Some(circid));
1534 let rmsg = match chmsg {
1535 AnyChanMsg::RelayEarly(r) => {
1536 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1537 .unwrap()
1538 }
1539 other => panic!("{:?}", other),
1540 };
1541 let e2 = match rmsg.msg() {
1542 AnyRelayMsg::Extend2(e2) => e2,
1543 other => panic!("{:?}", other),
1544 };
1545 let mut rng = testing_rng();
1546 let reply = match handshake_type {
1547 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1548 HandshakeType::Ntor => {
1549 let (_keygen, reply) = NtorServer::server(
1550 &mut rng,
1551 &mut |_: &()| Some(()),
1552 &[example_ntor_key()],
1553 e2.handshake(),
1554 )
1555 .unwrap();
1556 reply
1557 }
1558 #[cfg(feature = "ntor_v3")]
1559 HandshakeType::NtorV3 => {
1560 let (_keygen, reply) = NtorV3Server::server(
1561 &mut rng,
1562 &mut |_: &[NtorV3Extension]| Some(vec![]),
1563 &[example_ntor_v3_key()],
1564 e2.handshake(),
1565 )
1566 .unwrap();
1567 reply
1568 }
1569 };
1570
1571 let extended2 = relaymsg::Extended2::new(reply).into();
1572 sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1573 (sink, rx) };
1575
1576 let (circ, _) = futures::join!(extend_fut, reply_fut);
1577
1578 assert_eq!(circ.n_hops(), 4);
1580
1581 #[allow(deprecated)]
1583 {
1584 let path = circ.path();
1585 assert_eq!(path.len(), 4);
1586 use tor_linkspec::HasRelayIds;
1587 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1588 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1589 }
1590 {
1591 let path = circ.path_ref();
1592 assert_eq!(path.n_hops(), 4);
1593 use tor_linkspec::HasRelayIds;
1594 assert_eq!(
1595 path.hops()[3].as_chan_target().unwrap().ed_identity(),
1596 example_target().ed_identity()
1597 );
1598 assert_ne!(
1599 path.hops()[0].as_chan_target().unwrap().ed_identity(),
1600 example_target().ed_identity()
1601 );
1602 }
1603 }
1604
1605 #[traced_test]
1606 #[test]
1607 fn test_extend_ntor() {
1608 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1609 test_extend(&rt, HandshakeType::Ntor).await;
1610 });
1611 }
1612
1613 #[cfg(feature = "ntor_v3")]
1614 #[traced_test]
1615 #[test]
1616 fn test_extend_ntor_v3() {
1617 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1618 test_extend(&rt, HandshakeType::NtorV3).await;
1619 });
1620 }
1621
1622 async fn bad_extend_test_impl<R: Runtime>(
1623 rt: &R,
1624 reply_hop: HopNum,
1625 bad_reply: ClientCircChanMsg,
1626 ) -> Error {
1627 let (chan, _rx, _sink) = working_fake_channel(rt);
1628 let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
1629 let params = CircParameters::default();
1630
1631 let target = example_target();
1632 #[allow(clippy::clone_on_copy)]
1633 let rtc = rt.clone();
1634 let sink_handle = rt
1635 .spawn_with_handle(async move {
1636 rtc.sleep(Duration::from_millis(100)).await;
1637 sink.send(bad_reply).await.unwrap();
1638 sink
1639 })
1640 .unwrap();
1641 let outcome = circ.extend_ntor(&target, ¶ms).await;
1642 let _sink = sink_handle.await;
1643
1644 assert_eq!(circ.n_hops(), 3);
1645 assert!(outcome.is_err());
1646 outcome.unwrap_err()
1647 }
1648
1649 #[traced_test]
1650 #[test]
1651 fn bad_extend_wronghop() {
1652 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1653 let extended2 = relaymsg::Extended2::new(vec![]).into();
1654 let cc = rmsg_to_ccmsg(None, extended2);
1655
1656 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1657 match error {
1662 Error::CircuitClosed => {}
1663 x => panic!("got other error: {}", x),
1664 }
1665 });
1666 }
1667
1668 #[traced_test]
1669 #[test]
1670 fn bad_extend_wrongtype() {
1671 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1672 let extended = relaymsg::Extended::new(vec![7; 200]).into();
1673 let cc = rmsg_to_ccmsg(None, extended);
1674
1675 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1676 match error {
1677 Error::BytesErr {
1678 err: tor_bytes::Error::InvalidMessage(_),
1679 object: "extended2 message",
1680 } => {}
1681 other => panic!("{:?}", other),
1682 }
1683 });
1684 }
1685
1686 #[traced_test]
1687 #[test]
1688 fn bad_extend_destroy() {
1689 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1690 let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1691 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1692 match error {
1693 Error::CircuitClosed => {}
1694 other => panic!("{:?}", other),
1695 }
1696 });
1697 }
1698
1699 #[traced_test]
1700 #[test]
1701 fn bad_extend_crypto() {
1702 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1703 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1704 let cc = rmsg_to_ccmsg(None, extended2);
1705 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1706 assert!(matches!(error, Error::BadCircHandshakeAuth));
1707 });
1708 }
1709
1710 #[traced_test]
1711 #[test]
1712 fn begindir() {
1713 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1714 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1715 let (circ, mut sink) = newcirc(&rt, chan).await;
1716 let circid = circ.peek_circid();
1717
1718 let begin_and_send_fut = async move {
1719 let mut stream = circ.begin_dir_stream().await.unwrap();
1722 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1723 stream.flush().await.unwrap();
1724 let mut buf = [0_u8; 1024];
1725 let n = stream.read(&mut buf).await.unwrap();
1726 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1727 let n = stream.read(&mut buf).await.unwrap();
1728 assert_eq!(n, 0);
1729 stream
1730 };
1731 let reply_fut = async move {
1732 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1735 assert_eq!(id, Some(circid));
1736 let rmsg = match chmsg {
1737 AnyChanMsg::Relay(r) => {
1738 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1739 .unwrap()
1740 }
1741 other => panic!("{:?}", other),
1742 };
1743 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1744 assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
1745
1746 let connected = relaymsg::Connected::new_empty().into();
1748 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1749
1750 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1752 assert_eq!(id, Some(circid));
1753 let rmsg = match chmsg {
1754 AnyChanMsg::Relay(r) => {
1755 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1756 .unwrap()
1757 }
1758 other => panic!("{:?}", other),
1759 };
1760 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1761 assert_eq!(streamid_2, streamid);
1762 if let AnyRelayMsg::Data(d) = rmsg {
1763 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1764 } else {
1765 panic!();
1766 }
1767
1768 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1770 .unwrap()
1771 .into();
1772 sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
1773
1774 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1776 sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
1777
1778 (rx, sink) };
1780
1781 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1782 });
1783 }
1784
1785 fn close_stream_helper(by_drop: bool) {
1787 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1788 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1789 let (circ, mut sink) = newcirc(&rt, chan).await;
1790
1791 let stream_fut = async move {
1792 let stream = circ
1793 .begin_stream("www.example.com", 80, None)
1794 .await
1795 .unwrap();
1796
1797 let (r, mut w) = stream.split();
1798 if by_drop {
1799 drop(r);
1801 drop(w);
1802 (None, circ) } else {
1804 w.close().await.unwrap();
1806 (Some(r), circ)
1807 }
1808 };
1809 let handler_fut = async {
1810 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1812 let rmsg = match msg {
1813 AnyChanMsg::Relay(r) => {
1814 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1815 .unwrap()
1816 }
1817 other => panic!("{:?}", other),
1818 };
1819 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1820 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1821
1822 let connected =
1824 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1825 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1826
1827 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1829 let rmsg = match msg {
1830 AnyChanMsg::Relay(r) => {
1831 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1832 .unwrap()
1833 }
1834 other => panic!("{:?}", other),
1835 };
1836 let (_, rmsg) = rmsg.into_streamid_and_msg();
1837 assert_eq!(rmsg.cmd(), RelayCmd::END);
1838
1839 (rx, sink) };
1841
1842 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1843 });
1844 }
1845
1846 #[traced_test]
1847 #[test]
1848 fn drop_stream() {
1849 close_stream_helper(true);
1850 }
1851
1852 #[traced_test]
1853 #[test]
1854 fn close_stream() {
1855 close_stream_helper(false);
1856 }
1857
1858 async fn setup_incoming_sendme_case<R: Runtime>(
1860 rt: &R,
1861 n_to_send: usize,
1862 ) -> (
1863 Arc<ClientCirc>,
1864 DataStream,
1865 CircuitRxSender,
1866 Option<StreamId>,
1867 usize,
1868 Receiver<AnyChanCell>,
1869 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1870 ) {
1871 let (chan, mut rx, sink2) = working_fake_channel(rt);
1872 let (circ, mut sink) = newcirc(rt, chan).await;
1873 let circid = circ.peek_circid();
1874
1875 let begin_and_send_fut = {
1876 let circ = circ.clone();
1877 async move {
1878 let mut stream = circ
1880 .begin_stream("www.example.com", 443, None)
1881 .await
1882 .unwrap();
1883 let junk = [0_u8; 1024];
1884 let mut remaining = n_to_send;
1885 while remaining > 0 {
1886 let n = std::cmp::min(remaining, junk.len());
1887 stream.write_all(&junk[..n]).await.unwrap();
1888 remaining -= n;
1889 }
1890 stream.flush().await.unwrap();
1891 stream
1892 }
1893 };
1894
1895 let receive_fut = async move {
1896 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1898 let rmsg = match chmsg {
1899 AnyChanMsg::Relay(r) => {
1900 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1901 .unwrap()
1902 }
1903 other => panic!("{:?}", other),
1904 };
1905 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1906 assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
1907 let connected = relaymsg::Connected::new_empty().into();
1909 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1910 let mut bytes_received = 0_usize;
1912 let mut cells_received = 0_usize;
1913 while bytes_received < n_to_send {
1914 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1916 assert_eq!(id, Some(circid));
1917
1918 let rmsg = match chmsg {
1919 AnyChanMsg::Relay(r) => {
1920 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1921 .unwrap()
1922 }
1923 other => panic!("{:?}", other),
1924 };
1925 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
1926 assert_eq!(streamid2, streamid);
1927 if let AnyRelayMsg::Data(dat) = rmsg {
1928 cells_received += 1;
1929 bytes_received += dat.as_ref().len();
1930 } else {
1931 panic!();
1932 }
1933 }
1934
1935 (sink, streamid, cells_received, rx)
1936 };
1937
1938 let (stream, (sink, streamid, cells_received, rx)) =
1939 futures::join!(begin_and_send_fut, receive_fut);
1940
1941 (circ, stream, sink, streamid, cells_received, rx, sink2)
1942 }
1943
1944 #[traced_test]
1945 #[test]
1946 fn accept_valid_sendme() {
1947 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1948 let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
1949 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
1950
1951 assert_eq!(cells_received, 301);
1952
1953 {
1955 let (tx, rx) = oneshot::channel();
1956 circ.command
1957 .unbounded_send(CtrlCmd::QuerySendWindow {
1958 hop: 2.into(),
1959 done: tx,
1960 })
1961 .unwrap();
1962 let (window, tags) = rx.await.unwrap().unwrap();
1963 assert_eq!(window, 1000 - 301);
1964 assert_eq!(tags.len(), 3);
1965 assert_eq!(
1967 tags[0],
1968 sendme::CircTag::from(hex!("6400000000000000000000000000000000000000"))
1969 );
1970 assert_eq!(
1972 tags[1],
1973 sendme::CircTag::from(hex!("c800000000000000000000000000000000000000"))
1974 );
1975 assert_eq!(
1977 tags[2],
1978 sendme::CircTag::from(hex!("2c01000000000000000000000000000000000000"))
1979 );
1980 }
1981
1982 let reply_with_sendme_fut = async move {
1983 let c_sendme =
1985 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
1986 .into();
1987 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
1988
1989 let s_sendme = relaymsg::Sendme::new_empty().into();
1991 sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
1992
1993 sink
1994 };
1995
1996 let _sink = reply_with_sendme_fut.await;
1997
1998 rt.advance_until_stalled().await;
1999
2000 {
2003 let (tx, rx) = oneshot::channel();
2004 circ.command
2005 .unbounded_send(CtrlCmd::QuerySendWindow {
2006 hop: 2.into(),
2007 done: tx,
2008 })
2009 .unwrap();
2010 let (window, _tags) = rx.await.unwrap().unwrap();
2011 assert_eq!(window, 1000 - 201);
2012 }
2013 });
2014 }
2015
2016 #[traced_test]
2017 #[test]
2018 fn invalid_circ_sendme() {
2019 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2020 let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2024 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2025
2026 let reply_with_sendme_fut = async move {
2027 let c_sendme =
2029 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2030 .into();
2031 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2032 sink
2033 };
2034
2035 let _sink = reply_with_sendme_fut.await;
2036
2037 rt.advance_until_stalled().await;
2039 assert!(circ.is_closing());
2040 });
2041 }
2042
2043 #[traced_test]
2044 #[test]
2045 fn test_busy_stream_fairness() {
2046 const N_STREAMS: usize = 3;
2048 const N_CELLS: usize = 20;
2050 const N_BYTES: usize = relaymsg::Data::MAXLEN * N_CELLS;
2053 const MIN_EXPECTED_BYTES_PER_STREAM: usize = N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN;
2060
2061 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2062 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2063 let (circ, mut sink) = newcirc(&rt, chan).await;
2064
2065 rt.spawn({
2071 let circ = circ.clone();
2074 async move {
2075 let mut clients = VecDeque::new();
2076 struct Client {
2077 stream: DataStream,
2078 to_write: &'static [u8],
2079 }
2080 for _ in 0..N_STREAMS {
2081 clients.push_back(Client {
2082 stream: circ
2083 .begin_stream("www.example.com", 80, None)
2084 .await
2085 .unwrap(),
2086 to_write: &[0_u8; N_BYTES][..],
2087 });
2088 }
2089 while let Some(mut client) = clients.pop_front() {
2090 if client.to_write.is_empty() {
2091 continue;
2093 }
2094 let written = client.stream.write(client.to_write).await.unwrap();
2095 client.to_write = &client.to_write[written..];
2096 clients.push_back(client);
2097 }
2098 }
2099 })
2100 .unwrap();
2101
2102 let channel_handler_fut = async {
2103 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2104 let mut total_bytes_received = 0;
2105
2106 loop {
2107 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2108 let rmsg = match msg {
2109 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2110 RelayCellFormat::V0,
2111 r.into_relay_body(),
2112 )
2113 .unwrap(),
2114 other => panic!("Unexpected chanmsg: {other:?}"),
2115 };
2116 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2117 match rmsg.cmd() {
2118 RelayCmd::BEGIN => {
2119 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2121 assert_eq!(prev, None);
2122 let connected = relaymsg::Connected::new_with_addr(
2124 "10.0.0.1".parse().unwrap(),
2125 1234,
2126 )
2127 .into();
2128 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2129 }
2130 RelayCmd::DATA => {
2131 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2132 let nbytes = data_msg.as_ref().len();
2133 total_bytes_received += nbytes;
2134 let streamid = streamid.unwrap();
2135 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2136 *stream_bytes += nbytes;
2137 if total_bytes_received >= N_BYTES {
2138 break;
2139 }
2140 }
2141 RelayCmd::END => {
2142 continue;
2147 }
2148 other => {
2149 panic!("Unexpected command {other:?}");
2150 }
2151 }
2152 }
2153
2154 (total_bytes_received, stream_bytes_received, rx, sink)
2157 };
2158
2159 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2160 channel_handler_fut.await;
2161 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2162 for (sid, stream_bytes) in stream_bytes_received {
2163 assert!(
2164 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2165 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2166 );
2167 }
2168 });
2169 }
2170
2171 #[test]
2172 fn basic_params() {
2173 use super::CircParameters;
2174 let mut p = CircParameters::default();
2175 assert!(p.extend_by_ed25519_id);
2176
2177 p.extend_by_ed25519_id = false;
2178 assert!(!p.extend_by_ed25519_id);
2179 }
2180
2181 #[cfg(feature = "hs-service")]
2182 struct AllowAllStreamsFilter;
2183 #[cfg(feature = "hs-service")]
2184 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2185 fn disposition(
2186 &mut self,
2187 _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2188 _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2189 ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2190 Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2191 }
2192 }
2193
2194 #[traced_test]
2195 #[test]
2196 #[cfg(feature = "hs-service")]
2197 fn allow_stream_requests_twice() {
2198 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2199 let (chan, _rx, _sink) = working_fake_channel(&rt);
2200 let (circ, _send) = newcirc(&rt, chan).await;
2201
2202 let _incoming = circ
2203 .allow_stream_requests(
2204 &[tor_cell::relaycell::RelayCmd::BEGIN],
2205 circ.last_hop_num().unwrap(),
2206 AllowAllStreamsFilter,
2207 )
2208 .await
2209 .unwrap();
2210
2211 let incoming = circ
2212 .allow_stream_requests(
2213 &[tor_cell::relaycell::RelayCmd::BEGIN],
2214 circ.last_hop_num().unwrap(),
2215 AllowAllStreamsFilter,
2216 )
2217 .await;
2218
2219 assert!(incoming.is_err());
2221 });
2222 }
2223
2224 #[traced_test]
2225 #[test]
2226 #[cfg(feature = "hs-service")]
2227 fn allow_stream_requests() {
2228 use tor_cell::relaycell::msg::BeginFlags;
2229
2230 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2231 const TEST_DATA: &[u8] = b"ping";
2232
2233 let (chan, _rx, _sink) = working_fake_channel(&rt);
2234 let (circ, mut send) = newcirc(&rt, chan).await;
2235
2236 let (tx, rx) = oneshot::channel();
2238 let mut incoming = circ
2239 .allow_stream_requests(
2240 &[tor_cell::relaycell::RelayCmd::BEGIN],
2241 circ.last_hop_num().unwrap(),
2242 AllowAllStreamsFilter,
2243 )
2244 .await
2245 .unwrap();
2246
2247 let simulate_service = async move {
2248 let stream = incoming.next().await.unwrap();
2249 let mut data_stream = stream
2250 .accept_data(relaymsg::Connected::new_empty())
2251 .await
2252 .unwrap();
2253 tx.send(()).unwrap();
2255
2256 let mut buf = [0_u8; TEST_DATA.len()];
2258 data_stream.read_exact(&mut buf).await.unwrap();
2259 assert_eq!(&buf, TEST_DATA);
2260
2261 circ
2262 };
2263
2264 let simulate_client = async move {
2265 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2266 let body: BoxedCellBody =
2267 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2268 .encode(&mut testing_rng())
2269 .unwrap();
2270 let begin_msg = chanmsg::Relay::from(body);
2271
2272 send.send(ClientCircChanMsg::Relay(begin_msg))
2274 .await
2275 .unwrap();
2276
2277 rx.await.unwrap();
2283 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2285 let body: BoxedCellBody =
2286 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2287 .encode(&mut testing_rng())
2288 .unwrap();
2289 let data_msg = chanmsg::Relay::from(body);
2290
2291 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2292 send
2293 };
2294
2295 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2296 });
2297 }
2298
2299 #[traced_test]
2300 #[test]
2301 #[cfg(feature = "hs-service")]
2302 fn accept_stream_after_reject() {
2303 use tor_cell::relaycell::msg::BeginFlags;
2304 use tor_cell::relaycell::msg::EndReason;
2305
2306 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2307 const TEST_DATA: &[u8] = b"ping";
2308 const STREAM_COUNT: usize = 2;
2309
2310 let (chan, _rx, _sink) = working_fake_channel(&rt);
2311 let (circ, mut send) = newcirc(&rt, chan).await;
2312
2313 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2315
2316 let mut incoming = circ
2317 .allow_stream_requests(
2318 &[tor_cell::relaycell::RelayCmd::BEGIN],
2319 circ.last_hop_num().unwrap(),
2320 AllowAllStreamsFilter,
2321 )
2322 .await
2323 .unwrap();
2324
2325 let simulate_service = async move {
2326 for i in 0..STREAM_COUNT {
2328 let stream = incoming.next().await.unwrap();
2329
2330 if i == 0 {
2332 stream
2333 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2334 .await
2335 .unwrap();
2336 tx.send(()).await.unwrap();
2338 continue;
2339 }
2340
2341 let mut data_stream = stream
2342 .accept_data(relaymsg::Connected::new_empty())
2343 .await
2344 .unwrap();
2345 tx.send(()).await.unwrap();
2347
2348 let mut buf = [0_u8; TEST_DATA.len()];
2350 data_stream.read_exact(&mut buf).await.unwrap();
2351 assert_eq!(&buf, TEST_DATA);
2352 }
2353
2354 circ
2355 };
2356
2357 let simulate_client = async move {
2358 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2359 let body: BoxedCellBody =
2360 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2361 .encode(&mut testing_rng())
2362 .unwrap();
2363 let begin_msg = chanmsg::Relay::from(body);
2364
2365 for _ in 0..STREAM_COUNT {
2368 send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2369 .await
2370 .unwrap();
2371
2372 rx.next().await.unwrap();
2374 }
2375
2376 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2378 let body: BoxedCellBody =
2379 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2380 .encode(&mut testing_rng())
2381 .unwrap();
2382 let data_msg = chanmsg::Relay::from(body);
2383
2384 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2385 send
2386 };
2387
2388 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2389 });
2390 }
2391
2392 #[traced_test]
2393 #[test]
2394 #[cfg(feature = "hs-service")]
2395 fn incoming_stream_bad_hop() {
2396 use tor_cell::relaycell::msg::BeginFlags;
2397
2398 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2399 const EXPECTED_HOP: u8 = 1;
2401
2402 let (chan, _rx, _sink) = working_fake_channel(&rt);
2403 let (circ, mut send) = newcirc(&rt, chan).await;
2404
2405 let mut incoming = circ
2407 .allow_stream_requests(
2408 &[tor_cell::relaycell::RelayCmd::BEGIN],
2409 EXPECTED_HOP.into(),
2410 AllowAllStreamsFilter,
2411 )
2412 .await
2413 .unwrap();
2414
2415 let simulate_service = async move {
2416 assert!(incoming.next().await.is_none());
2419 circ
2420 };
2421
2422 let simulate_client = async move {
2423 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2424 let body: BoxedCellBody =
2425 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2426 .encode(&mut testing_rng())
2427 .unwrap();
2428 let begin_msg = chanmsg::Relay::from(body);
2429
2430 send.send(ClientCircChanMsg::Relay(begin_msg))
2432 .await
2433 .unwrap();
2434
2435 send
2436 };
2437
2438 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2439 });
2440 }
2441}