use std::time::Duration;
use std::collections::HashMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use educe::Educe;
use futures::{AsyncRead, AsyncWrite};
use itertools::Itertools;
use rand::Rng;
use tor_bytes::Writeable;
use tor_cell::relaycell::hs::intro_payload::{self, IntroduceHandshakePayload};
use tor_cell::relaycell::msg::{AnyRelayMsg, Introduce1, Rendezvous2};
use tor_error::Bug;
use tor_hscrypto::Subcredential;
use tor_proto::circuit::handshake::hs_ntor;
use tracing::{debug, trace, warn};
use retry_error::RetryError;
use safelog::Sensitive;
use tor_cell::relaycell::hs::{
AuthKeyType, EstablishRendezvous, IntroduceAck, RendezvousEstablished,
};
use tor_cell::relaycell::RelayMsg;
use tor_checkable::{timed::TimerangeBound, Timebound};
use tor_circmgr::build::circparameters_from_netparameters;
use tor_circmgr::hspool::{HsCircKind, HsCircPool};
use tor_circmgr::timeouts::Action as TimeoutsAction;
use tor_dirclient::request::Requestable as _;
use tor_error::{internal, into_internal, ErrorReport as _};
use tor_error::{HasRetryTime as _, RetryTime};
use tor_hscrypto::pk::{HsBlindId, HsClientDescEncKey, HsId, HsIdKey};
use tor_hscrypto::RendCookie;
use tor_linkspec::{CircTarget, HasRelayIds, OwnedCircTarget, RelayId};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_netdir::{HsDirOp, NetDir, Relay};
use tor_netdoc::doc::hsdesc::{HsDesc, IntroPointDesc};
use tor_proto::circuit::{CircParameters, ClientCirc, MetaCellDisposition, MsgHandler};
use tor_rtcompat::{Runtime, SleepProviderExt as _, TimeoutError};
use crate::proto_oneshot;
use crate::relay_info::ipt_to_circtarget;
use crate::state::MockableConnectorData;
use crate::Config;
use crate::{rend_pt_identity_for_error, FailedAttemptError, IntroPtIndex, RendPtIdentityForError};
use crate::{ConnError, DescriptorError, DescriptorErrorDetail};
use crate::{HsClientConnector, HsClientSecretKeys};
use ConnError as CE;
use FailedAttemptError as FAE;
const HOPS: usize = 3;
macro_rules! ClientCirc { { $R:ty, $M:ty } => {
<<$M as MocksForConnect<$R>>::HsCircPool as MockableCircPool<$R>>::ClientCirc
} }
#[derive(Default, Educe)]
#[educe(Debug)]
pub struct Data {
desc: DataHsDesc,
ipts: DataIpts,
}
type DataHsDesc = Option<TimerangeBound<HsDesc>>;
type DataIpts = HashMap<RelayIdForExperience, IptExperience>;
#[derive(Debug)]
struct IptExperience {
duration: Duration,
outcome: Result<(), RetryTime>,
}
pub(crate) async fn connect<R: Runtime>(
connector: &HsClientConnector<R>,
netdir: Arc<NetDir>,
config: Arc<Config>,
hsid: HsId,
data: &mut Data,
secret_keys: HsClientSecretKeys,
) -> Result<Arc<ClientCirc>, ConnError> {
Context::new(
&connector.runtime,
&*connector.circpool,
netdir,
config,
hsid,
secret_keys,
(),
)?
.connect(data)
.await
}
struct Context<'c, R: Runtime, M: MocksForConnect<R>> {
runtime: &'c R,
circpool: &'c M::HsCircPool,
netdir: Arc<NetDir>,
config: Arc<Config>,
secret_keys: HsClientSecretKeys,
hsid: HsId,
hs_blind_id: HsBlindId,
subcredential: Subcredential,
mocks: M,
}
struct Rendezvous<'r, R: Runtime, M: MocksForConnect<R>> {
rend_relay: Relay<'r>,
rend_circ: Arc<ClientCirc!(R, M)>,
rend_cookie: RendCookie,
rend2_rx: proto_oneshot::Receiver<Rendezvous2>,
marker: PhantomData<fn() -> (R, M)>,
}
type IptSortRand = u32;
struct UsableIntroPt<'i> {
intro_index: IntroPtIndex,
intro_desc: &'i IntroPointDesc,
intro_target: OwnedCircTarget,
sort_rand: IptSortRand,
}
#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug)]
struct RelayIdForExperience(RelayId);
struct Introduced<R: Runtime, M: MocksForConnect<R>> {
handshake_state: hs_ntor::HsNtorClientState,
marker: PhantomData<fn() -> (R, M)>,
}
impl RelayIdForExperience {
fn for_lookup(intro_target: &OwnedCircTarget) -> impl Iterator<Item = Self> + '_ {
intro_target
.identities()
.map(|id| RelayIdForExperience(id.to_owned()))
}
fn for_store(intro_target: &OwnedCircTarget) -> Result<Self, Bug> {
let id = intro_target
.identities()
.next()
.ok_or_else(|| internal!("introduction point relay with no identities"))?
.to_owned();
Ok(RelayIdForExperience(id))
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
struct IptSortKey {
outcome: IptSortKeyOutcome,
sort_rand: IptSortRand,
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
enum IptSortKeyOutcome {
Success {
duration: Duration,
},
Untried,
Failed {
retry_time: tor_error::LooseCmpRetryTime,
duration: Duration,
},
}
impl From<Option<&IptExperience>> for IptSortKeyOutcome {
fn from(experience: Option<&IptExperience>) -> IptSortKeyOutcome {
use IptSortKeyOutcome as O;
match experience {
None => O::Untried,
Some(IptExperience { duration, outcome }) => match outcome {
Ok(()) => O::Success {
duration: *duration,
},
Err(retry_time) => O::Failed {
retry_time: (*retry_time).into(),
duration: *duration,
},
},
}
}
}
impl<'c, R: Runtime, M: MocksForConnect<R>> Context<'c, R, M> {
fn new(
runtime: &'c R,
circpool: &'c M::HsCircPool,
netdir: Arc<NetDir>,
config: Arc<Config>,
hsid: HsId,
secret_keys: HsClientSecretKeys,
mocks: M,
) -> Result<Self, ConnError> {
let time_period = netdir.hs_time_period();
let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
.map_err(|_| CE::InvalidHsId)?
.compute_blinded_key(time_period)
.map_err(
into_internal!("key blinding error, don't know how to handle"),
)?;
let hs_blind_id = hs_blind_id_key.id();
Ok(Context {
netdir,
config,
hsid,
hs_blind_id,
subcredential,
circpool,
runtime,
secret_keys,
mocks,
})
}
async fn connect(&self, data: &mut Data) -> Result<Arc<ClientCirc!(R, M)>, ConnError> {
let mocks = self.mocks.clone();
let desc = self.descriptor_ensure(&mut data.desc).await?;
mocks.test_got_desc(desc);
let circ = self.intro_rend_connect(desc, &mut data.ipts).await?;
mocks.test_got_circ(&circ);
Ok(circ)
}
async fn descriptor_ensure<'d>(&self, data: &'d mut DataHsDesc) -> Result<&'d HsDesc, CE> {
let max_total_attempts = self
.config
.retry
.hs_desc_fetch_attempts()
.try_into()
.unwrap_or(usize::MAX);
let each_timeout = self.estimate_timeout(&[
(1, TimeoutsAction::BuildCircuit { length: HOPS }), (1, TimeoutsAction::RoundTrip { length: HOPS }), ]);
if let Some(previously) = data {
let now = self.runtime.wallclock();
if let Ok(_desc) = previously.as_ref().check_valid_at(&now) {
return Ok(data
.as_ref()
.expect("Some but now None")
.as_ref()
.check_valid_at(&now)
.expect("Ok but now Err"));
}
}
let hs_dirs = self.netdir.hs_dirs(
&self.hs_blind_id,
HsDirOp::Download,
&mut self.mocks.thread_rng(),
);
trace!(
"HS desc fetch for {}, using {} hsdirs",
&self.hsid,
hs_dirs.len()
);
let mut attempts = hs_dirs.iter().cycle().take(max_total_attempts);
let mut errors = RetryError::in_attempt_to("retrieve hidden service descriptor");
let desc = loop {
let relay = match attempts.next() {
Some(relay) => relay,
None => {
return Err(if errors.is_empty() {
CE::NoHsDirs
} else {
CE::DescriptorDownload(errors)
})
}
};
let hsdir_for_error: Sensitive<Ed25519Identity> = (*relay.id()).into();
match self
.runtime
.timeout(each_timeout, self.descriptor_fetch_attempt(relay))
.await
.unwrap_or(Err(DescriptorErrorDetail::Timeout))
{
Ok(desc) => break desc,
Err(error) => {
debug!(
"failed hsdir desc fetch for {} from {}: {}",
&self.hsid,
&relay.id(),
error.report()
);
errors.push(tor_error::Report(DescriptorError {
hsdir: hsdir_for_error,
error,
}));
}
}
};
let ret = data.insert(desc);
Ok(ret.as_ref().dangerously_assume_timely())
}
async fn descriptor_fetch_attempt(
&self,
hsdir: &Relay<'_>,
) -> Result<TimerangeBound<HsDesc>, DescriptorErrorDetail> {
let max_len: usize = self
.netdir
.params()
.hsdir_max_desc_size
.get()
.try_into()
.map_err(into_internal!("BoundedInt was not truly bounded!"))?;
let request = {
let mut r = tor_dirclient::request::HsDescDownloadRequest::new(self.hs_blind_id);
r.set_max_len(max_len);
r
};
trace!(
"hsdir for {}, trying {}/{}, request {:?} (http request {:?}",
&self.hsid,
&hsdir.id(),
&hsdir.rsa_id(),
&request,
request.make_request()
);
let circuit = self
.circpool
.get_or_launch_specific(
&self.netdir,
HsCircKind::ClientHsDir,
OwnedCircTarget::from_circ_target(hsdir),
)
.await?;
let mut stream = circuit
.begin_dir_stream()
.await
.map_err(DescriptorErrorDetail::Stream)?;
let response = tor_dirclient::download(self.runtime, &request, &mut stream, None)
.await
.map_err(|dir_error| match dir_error {
tor_dirclient::Error::RequestFailed(rfe) => DescriptorErrorDetail::from(rfe.error),
tor_dirclient::Error::CircMgr(ce) => into_internal!(
"tor-dirclient complains about circmgr going wrong but we gave it a stream"
)(ce)
.into(),
other => into_internal!(
"tor-dirclient gave unexpected error, tor-hsclient code needs updating"
)(other)
.into(),
})?;
let desc_text = response.into_output_string().map_err(|rfe| rfe.error)?;
let hsc_desc_enc = self
.secret_keys
.keys
.ks_hsc_desc_enc
.as_ref()
.map(|ks| (HsClientDescEncKey::from(ks), ks));
let now = self.runtime.wallclock();
HsDesc::parse_decrypt_validate(
&desc_text,
&self.hs_blind_id,
now,
&self.subcredential,
hsc_desc_enc.as_ref().map(|(kp, ks)| (kp, *ks)),
)
.map_err(DescriptorErrorDetail::from)
}
async fn intro_rend_connect(
&self,
desc: &HsDesc,
data: &mut DataIpts,
) -> Result<Arc<ClientCirc!(R, M)>, CE> {
let max_total_attempts = self
.config
.retry
.hs_intro_rend_attempts()
.try_into()
.unwrap_or(usize::MAX);
let rend_timeout = self.estimate_timeout(&[
(1, TimeoutsAction::BuildCircuit { length: HOPS }), (1, TimeoutsAction::RoundTrip { length: HOPS }), ]);
let intro_timeout = self.estimate_timeout(&[
(1, TimeoutsAction::BuildCircuit { length: HOPS }), (1, TimeoutsAction::RoundTrip { length: HOPS }), ]);
let hs_hops = if desc.is_single_onion_service() {
1
} else {
HOPS
};
let rpt_ipt_timeout = self.estimate_timeout(&[
(1, TimeoutsAction::BuildCircuit { length: hs_hops }),
(1, TimeoutsAction::RoundTrip { length: HOPS }),
]);
let mut rend_attempts = 0..max_total_attempts;
let mut errors = RetryError::in_attempt_to("make circuit to to hidden service");
let mut usable_intros: Vec<UsableIntroPt> = desc
.intro_points()
.iter()
.enumerate()
.map(|(intro_index, intro_desc)| {
let intro_index = intro_index.into();
let intro_target = ipt_to_circtarget(intro_desc, &self.netdir)
.map_err(|error| FAE::UnusableIntro { error, intro_index })?;
let intro_target = OwnedCircTarget::from_circ_target(&intro_target);
Ok(UsableIntroPt {
intro_index,
intro_desc,
intro_target,
sort_rand: self.mocks.thread_rng().gen(),
})
})
.filter_map(|entry| match entry {
Ok(y) => Some(y),
Err(e) => {
errors.push(tor_error::Report(e));
None
}
})
.collect_vec();
data.retain(|k, _v| {
usable_intros
.iter()
.any(|ipt| RelayIdForExperience::for_lookup(&ipt.intro_target).any(|id| &id == k))
});
usable_intros.sort_by_key(|ipt: &UsableIntroPt| {
let experience =
RelayIdForExperience::for_lookup(&ipt.intro_target).find_map(|id| data.get(&id));
IptSortKey {
outcome: experience.into(),
sort_rand: ipt.sort_rand,
}
});
self.mocks.test_got_ipts(&usable_intros);
let mut intro_attempts = usable_intros.iter().cycle().take(max_total_attempts);
let mut saved_rendezvous = None;
loop {
let mut ipt_use_started = None::<Instant>;
let outcome = async {
if saved_rendezvous.is_none() {
debug!("hs conn to {}: setting up rendezvous point", &self.hsid);
let Some(_): Option<usize> = rend_attempts.next() else { return Ok(None) };
let mut using_rend_pt = None;
saved_rendezvous = Some(
self.runtime
.timeout(rend_timeout, self.establish_rendezvous(&mut using_rend_pt))
.await
.map_err(|_: TimeoutError| match using_rend_pt {
None => FAE::RendezvousCircuitObtain {
error: tor_circmgr::Error::CircTimeout,
},
Some(rend_pt) => FAE::RendezvousEstablishTimeout { rend_pt },
})??,
);
}
let Some(ipt) = intro_attempts.next() else { return Ok(None) };
let intro_index = ipt.intro_index;
ipt_use_started = Some(self.runtime.now());
let rend_pt_for_error = rend_pt_identity_for_error(
&saved_rendezvous
.as_ref()
.expect("just made Some")
.rend_relay,
);
debug!(
"hs conn to {}: RPT {}",
&self.hsid,
rend_pt_for_error.as_inner()
);
let (rendezvous, introduced) = self
.runtime
.timeout(
intro_timeout,
self.exchange_introduce(ipt, &mut saved_rendezvous),
)
.await
.map_err(|_: TimeoutError| {
FAE::IntroductionTimeout { intro_index }
})?
?;
#[allow(unused_variables)] let saved_rendezvous = ();
let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
let circ = self
.runtime
.timeout(
rpt_ipt_timeout,
self.complete_rendezvous(ipt, rendezvous, introduced),
)
.await
.map_err(|_: TimeoutError| FAE::RendezvousCompletionTimeout {
intro_index,
rend_pt: rend_pt.clone(),
})??;
debug!(
"hs conn to {}: RPT {} IPT {}: success",
&self.hsid,
rend_pt.as_inner(),
intro_index,
);
Ok::<_, FAE>(Some((intro_index, circ)))
}
.await;
#[allow(clippy::unused_unit)] let mut store_experience = |intro_index, outcome| -> () {
(|| {
let ipt = usable_intros
.iter()
.find(|ipt| ipt.intro_index == intro_index)
.ok_or_else(|| internal!("IPT not found by index"))?;
let id = RelayIdForExperience::for_store(&ipt.intro_target)?;
let started = ipt_use_started.ok_or_else(|| {
internal!("trying to record IPT use but no IPT start time noted")
})?;
let duration = self
.runtime
.now()
.checked_duration_since(started)
.ok_or_else(|| internal!("clock overflow calculating IPT use duration"))?;
data.insert(id, IptExperience { duration, outcome });
Ok::<_, Bug>(())
})()
.unwrap_or_else(|e| warn!("error recording HS IPT use experience: {}", e.report()));
};
match outcome {
Ok(Some((intro_index, y))) => {
store_experience(intro_index, Ok(()));
return Ok(y);
}
Ok(None) => return Err(CE::Failed(errors)),
Err(error) => {
debug!(
"hs conn to {}: attempt failed: {}",
&self.hsid,
error.report(),
);
if let Some(intro_index) = error.intro_index() {
store_experience(intro_index, Err(error.retry_time()));
}
errors.push(tor_error::Report(error));
}
}
}
}
async fn establish_rendezvous(
&'c self,
using_rend_pt: &mut Option<RendPtIdentityForError>,
) -> Result<Rendezvous<R, M>, FAE> {
let (rend_circ, rend_relay) = self
.circpool
.get_or_launch_client_rend(&self.netdir)
.await
.map_err(|error| FAE::RendezvousCircuitObtain { error })?;
let rend_pt = rend_pt_identity_for_error(&rend_relay);
*using_rend_pt = Some(rend_pt.clone());
let rend_cookie: RendCookie = self.mocks.thread_rng().gen();
let message = EstablishRendezvous::new(rend_cookie);
let (rend_established_tx, rend_established_rx) = proto_oneshot::channel();
let (rend2_tx, rend2_rx) = proto_oneshot::channel();
struct Handler {
rend_established_tx: proto_oneshot::Sender<RendezvousEstablished>,
rend2_tx: proto_oneshot::Sender<Rendezvous2>,
}
impl MsgHandler for Handler {
fn handle_msg(
&mut self,
msg: AnyRelayMsg,
) -> Result<MetaCellDisposition, tor_proto::Error> {
if self.rend_established_tx.still_expected() {
self.rend_established_tx
.deliver_expected_message(msg, MetaCellDisposition::Consumed)
} else {
self.rend2_tx
.deliver_expected_message(msg, MetaCellDisposition::UninstallHandler)
}
}
}
debug!(
"hs conn to {}: RPT {}: sending ESTABLISH_RENDEZVOUS",
&self.hsid,
rend_pt.as_inner(),
);
let handle_proto_error = |error| FAE::RendezvousEstablish {
error,
rend_pt: rend_pt.clone(),
};
let handler = Handler {
rend_established_tx,
rend2_tx,
};
rend_circ
.send_control_message(message.into(), handler)
.await
.map_err(handle_proto_error)?;
let _: RendezvousEstablished = rend_established_rx.recv(handle_proto_error).await?;
debug!(
"hs conn to {}: RPT {}: got RENDEZVOUS_ESTABLISHED",
&self.hsid,
rend_pt.as_inner(),
);
Ok(Rendezvous {
rend_circ,
rend_cookie,
rend_relay,
rend2_rx,
marker: PhantomData,
})
}
async fn exchange_introduce(
&'c self,
ipt: &UsableIntroPt<'_>,
rendezvous: &mut Option<Rendezvous<'c, R, M>>,
) -> Result<(Rendezvous<R, M>, Introduced<R, M>), FAE> {
let intro_index = ipt.intro_index;
debug!(
"hs conn to {}: IPT {}: obtaining intro circuit",
&self.hsid, intro_index,
);
let intro_circ = self
.circpool
.get_or_launch_specific(
&self.netdir,
HsCircKind::ClientIntro,
ipt.intro_target.clone(), )
.await
.map_err(|error| FAE::IntroductionCircuitObtain { error, intro_index })?;
let rendezvous = rendezvous.take().ok_or_else(|| internal!("no rend"))?;
let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
debug!(
"hs conn to {}: RPT {} IPT {}: making introduction",
&self.hsid,
rend_pt.as_inner(),
intro_index,
);
let intro_header = {
let ipt_sid_key = ipt.intro_desc.ipt_sid_key();
let intro1 = Introduce1::new(
AuthKeyType::ED25519_SHA3_256,
ipt_sid_key.as_bytes().to_vec(),
vec![],
);
let mut header = vec![];
intro1
.encode_onto(&mut header)
.map_err(into_internal!("couldn't encode intro1 header"))?;
header
};
let intro_payload = {
let onion_key =
intro_payload::OnionKey::NtorOnionKey(*rendezvous.rend_relay.ntor_onion_key());
let linkspecs = rendezvous
.rend_relay
.linkspecs()
.map_err(into_internal!("Couldn't encode link specifiers"))?;
let payload =
IntroduceHandshakePayload::new(rendezvous.rend_cookie, onion_key, linkspecs);
let mut encoded = vec![];
payload
.write_onto(&mut encoded)
.map_err(into_internal!("Couldn't encode introduce1 payload"))?;
encoded
};
let service_info = hs_ntor::HsNtorServiceInfo::new(
ipt.intro_desc.svc_ntor_key().clone(),
ipt.intro_desc.ipt_sid_key().clone(),
self.subcredential,
);
let handshake_state =
hs_ntor::HsNtorClientState::new(&mut self.mocks.thread_rng(), service_info);
let encrypted_body = handshake_state
.client_send_intro(&intro_header, &intro_payload)
.map_err(into_internal!("can't begin hs-ntor handshake"))?;
let intro1_real = Introduce1::new(
AuthKeyType::ED25519_SHA3_256,
ipt.intro_desc.ipt_sid_key().as_bytes().to_vec(),
encrypted_body,
);
struct Handler {
intro_ack_tx: proto_oneshot::Sender<IntroduceAck>,
}
impl MsgHandler for Handler {
fn handle_msg(
&mut self,
msg: AnyRelayMsg,
) -> Result<MetaCellDisposition, tor_proto::Error> {
self.intro_ack_tx
.deliver_expected_message(msg, MetaCellDisposition::UninstallHandler)
}
}
let handle_intro_proto_error = |error| FAE::IntroductionExchange { error, intro_index };
let (intro_ack_tx, intro_ack_rx) = proto_oneshot::channel();
let handler = Handler { intro_ack_tx };
debug!(
"hs conn to {}: RPT {} IPT {}: making introduction - sending INTRODUCE1",
&self.hsid,
rend_pt.as_inner(),
intro_index,
);
intro_circ
.send_control_message(intro1_real.into(), handler)
.await
.map_err(handle_intro_proto_error)?;
let _: IntroduceAck = intro_ack_rx
.recv(handle_intro_proto_error)
.await?
.success()
.map_err(|status| FAE::IntroductionFailed {
status,
intro_index,
})?;
debug!(
"hs conn to {}: RPT {} IPT {}: making introduction - success",
&self.hsid,
rend_pt.as_inner(),
intro_index,
);
drop(intro_circ);
Ok((
rendezvous,
Introduced {
handshake_state,
marker: PhantomData,
},
))
}
async fn complete_rendezvous(
&'c self,
ipt: &UsableIntroPt<'_>,
rendezvous: Rendezvous<'c, R, M>,
introduced: Introduced<R, M>,
) -> Result<Arc<ClientCirc!(R, M)>, FAE> {
use tor_proto::circuit::handshake;
let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
let intro_index = ipt.intro_index;
let handle_proto_error = |error| FAE::RendezvousCompletionCircuitError {
error,
intro_index,
rend_pt: rend_pt.clone(),
};
debug!(
"hs conn to {}: RPT {} IPT {}: awaiting rendezvous completion",
&self.hsid,
rend_pt.as_inner(),
intro_index,
);
let rend2_msg: Rendezvous2 = rendezvous.rend2_rx.recv(handle_proto_error).await?;
debug!(
"hs conn to {}: RPT {} IPT {}: received RENDEZVOUS2",
&self.hsid,
rend_pt.as_inner(),
intro_index,
);
let handshake_state = introduced.handshake_state;
let keygen = handshake_state
.client_receive_rend(rend2_msg.handshake_info())
.map_err(|error| FAE::RendezvousCompletionHandshake {
error,
intro_index,
rend_pt: rend_pt.clone(),
})?;
let params = circparameters_from_netparameters(self.netdir.params());
rendezvous
.rend_circ
.extend_virtual(
handshake::RelayProtocol::HsV3,
handshake::HandshakeRole::Initiator,
keygen,
params,
)
.await
.map_err(into_internal!(
"actually this is probably a 'circuit closed' error" ))?;
debug!(
"hs conn to {}: RPT {} IPT {}: HS circuit established",
&self.hsid,
rend_pt.as_inner(),
intro_index,
);
Ok(rendezvous.rend_circ)
}
fn estimate_timeout(&self, actions: &[(u32, TimeoutsAction)]) -> Duration {
actions
.iter()
.map(|(count, action)| {
self.circpool
.estimate_timeout(action)
.saturating_mul(*count)
})
.fold(Duration::ZERO, Duration::saturating_add)
}
}
trait MocksForConnect<R>: Clone {
type HsCircPool: MockableCircPool<R>;
type Rng: rand::Rng + rand::CryptoRng;
fn test_got_desc(&self, _: &HsDesc) {}
fn test_got_circ(&self, _: &Arc<ClientCirc!(R, Self)>) {}
fn test_got_ipts(&self, _: &[UsableIntroPt]) {}
fn thread_rng(&self) -> Self::Rng;
}
#[async_trait]
trait MockableCircPool<R> {
type ClientCirc: MockableClientCirc;
async fn get_or_launch_specific(
&self,
netdir: &NetDir,
kind: HsCircKind,
target: impl CircTarget + Send + Sync + 'async_trait,
) -> tor_circmgr::Result<Arc<Self::ClientCirc>>;
async fn get_or_launch_client_rend<'a>(
&self,
netdir: &'a NetDir,
) -> tor_circmgr::Result<(Arc<Self::ClientCirc>, Relay<'a>)>;
fn estimate_timeout(&self, action: &TimeoutsAction) -> Duration;
}
#[async_trait]
trait MockableClientCirc: Debug {
type DirStream: AsyncRead + AsyncWrite + Send + Unpin;
async fn begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream>;
async fn send_control_message(
&self,
msg: AnyRelayMsg,
reply_handler: impl MsgHandler + Send + 'static,
) -> tor_proto::Result<()>;
async fn extend_virtual(
&self,
protocol: tor_proto::circuit::handshake::RelayProtocol,
protocol: tor_proto::circuit::handshake::HandshakeRole,
handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
params: CircParameters,
) -> tor_proto::Result<()>;
}
impl<R: Runtime> MocksForConnect<R> for () {
type HsCircPool = HsCircPool<R>;
type Rng = rand::rngs::ThreadRng;
fn thread_rng(&self) -> Self::Rng {
rand::thread_rng()
}
}
#[async_trait]
impl<R: Runtime> MockableCircPool<R> for HsCircPool<R> {
type ClientCirc = ClientCirc;
async fn get_or_launch_specific(
&self,
netdir: &NetDir,
kind: HsCircKind,
target: impl CircTarget + Send + Sync + 'async_trait,
) -> tor_circmgr::Result<Arc<ClientCirc>> {
HsCircPool::get_or_launch_specific(self, netdir, kind, target).await
}
async fn get_or_launch_client_rend<'a>(
&self,
netdir: &'a NetDir,
) -> tor_circmgr::Result<(Arc<ClientCirc>, Relay<'a>)> {
HsCircPool::get_or_launch_client_rend(self, netdir).await
}
fn estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
HsCircPool::estimate_timeout(self, action)
}
}
#[async_trait]
impl MockableClientCirc for ClientCirc {
type DirStream = tor_proto::stream::DataStream;
async fn begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
ClientCirc::begin_dir_stream(self).await
}
async fn send_control_message(
&self,
msg: AnyRelayMsg,
reply_handler: impl MsgHandler + Send + 'static,
) -> tor_proto::Result<()> {
ClientCirc::send_control_message(self, msg, reply_handler).await
}
async fn extend_virtual(
&self,
protocol: tor_proto::circuit::handshake::RelayProtocol,
role: tor_proto::circuit::handshake::HandshakeRole,
handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
params: CircParameters,
) -> tor_proto::Result<()> {
ClientCirc::extend_virtual(self, protocol, role, handshake, params).await
}
}
#[async_trait]
impl MockableConnectorData for Data {
type ClientCirc = ClientCirc;
type MockGlobalState = ();
async fn connect<R: Runtime>(
connector: &HsClientConnector<R>,
netdir: Arc<NetDir>,
config: Arc<Config>,
hsid: HsId,
data: &mut Self,
secret_keys: HsClientSecretKeys,
) -> Result<Arc<Self::ClientCirc>, ConnError> {
connect(connector, netdir, config, hsid, data, secret_keys).await
}
fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool {
!circuit.is_closing()
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(dead_code, unused_variables)]
use super::*;
use crate::*;
use futures::FutureExt as _;
use std::ops::{Bound, RangeBounds};
use std::{iter, panic::AssertUnwindSafe};
use tokio_crate as tokio;
use tor_async_utils::JoinReadWrite;
use tor_basic_utils::test_rng::{testing_rng, TestingRng};
use tor_llcrypto::pk::curve25519;
use tor_netdoc::doc::{hsdesc::test_data, netstatus::Lifetime};
use tor_rtcompat::{tokio::TokioNativeTlsRuntime, CompoundRuntime};
use tor_rtmock::time::MockSleepProvider;
use tracing_test::traced_test;
#[derive(Debug, Default)]
struct MocksGlobal {
hsdirs_asked: Vec<OwnedCircTarget>,
got_desc: Option<HsDesc>,
}
#[derive(Clone, Debug)]
struct Mocks<I> {
mglobal: Arc<Mutex<MocksGlobal>>,
id: I,
}
impl<I> Mocks<I> {
fn map_id<J>(&self, f: impl FnOnce(&I) -> J) -> Mocks<J> {
Mocks {
mglobal: self.mglobal.clone(),
id: f(&self.id),
}
}
}
impl<R: Runtime> MocksForConnect<R> for Mocks<()> {
type HsCircPool = Mocks<()>;
type Rng = TestingRng;
fn test_got_desc(&self, desc: &HsDesc) {
self.mglobal.lock().unwrap().got_desc = Some(desc.clone());
}
fn test_got_ipts(&self, desc: &[UsableIntroPt]) {}
fn thread_rng(&self) -> Self::Rng {
testing_rng()
}
}
#[async_trait]
impl<R: Runtime> MockableCircPool<R> for Mocks<()> {
type ClientCirc = Mocks<()>;
async fn get_or_launch_specific(
&self,
_netdir: &NetDir,
kind: HsCircKind,
target: impl CircTarget + Send + Sync + 'async_trait,
) -> tor_circmgr::Result<Arc<Self::ClientCirc>> {
assert_eq!(kind, HsCircKind::ClientHsDir);
let target = OwnedCircTarget::from_circ_target(&target);
self.mglobal.lock().unwrap().hsdirs_asked.push(target);
Ok(Arc::new(self.clone()))
}
async fn get_or_launch_client_rend<'a>(
&self,
netdir: &'a NetDir,
) -> tor_circmgr::Result<(Arc<ClientCirc!(R, Self)>, Relay<'a>)> {
todo!()
}
fn estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
Duration::from_secs(10)
}
}
#[async_trait]
impl MockableClientCirc for Mocks<()> {
type DirStream = JoinReadWrite<futures::io::Cursor<Box<[u8]>>, futures::io::Sink>;
async fn begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
let response = format!(
r#"HTTP/1.1 200 OK
{}"#,
test_data::TEST_DATA_2
)
.into_bytes()
.into_boxed_slice();
Ok(JoinReadWrite::new(
futures::io::Cursor::new(response),
futures::io::sink(),
))
}
async fn send_control_message(
&self,
msg: AnyRelayMsg,
reply_handler: impl MsgHandler + Send + 'static,
) -> tor_proto::Result<()> {
todo!()
}
async fn extend_virtual(
&self,
protocol: tor_proto::circuit::handshake::RelayProtocol,
role: tor_proto::circuit::handshake::HandshakeRole,
handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
params: CircParameters,
) -> tor_proto::Result<()> {
todo!()
}
}
#[traced_test]
#[tokio::test]
async fn test_connect() {
let valid_after = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
let fresh_until = valid_after + humantime::parse_duration("1 hours").unwrap();
let valid_until = valid_after + humantime::parse_duration("24 hours").unwrap();
let lifetime = Lifetime::new(valid_after, fresh_until, valid_until).unwrap();
let netdir = tor_netdir::testnet::construct_custom_netdir_with_params(
tor_netdir::testnet::simple_net_func,
iter::empty::<(&str, _)>(),
Some(lifetime),
)
.expect("failed to build default testing netdir");
let netdir = Arc::new(netdir.unwrap_if_sufficient().unwrap());
let runtime = TokioNativeTlsRuntime::current().unwrap();
let now = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
let mock_sp = MockSleepProvider::new(now);
let runtime = CompoundRuntime::new(
runtime.clone(),
mock_sp,
runtime.clone(),
runtime.clone(),
runtime,
);
let time_period = netdir.hs_time_period();
let mglobal = Arc::new(Mutex::new(MocksGlobal::default()));
let mocks = Mocks { mglobal, id: () };
let hsid = test_data::TEST_HSID_2.into();
let mut data = Data::default();
let pk = curve25519::PublicKey::from(test_data::TEST_PUBKEY_2).into();
let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
let mut secret_keys_builder = HsClientSecretKeysBuilder::default();
secret_keys_builder.ks_hsc_desc_enc(sk);
let secret_keys = secret_keys_builder.build().unwrap();
let ctx = Context::new(
&runtime,
&mocks,
netdir,
Default::default(),
hsid,
secret_keys,
mocks.clone(),
)
.unwrap();
let _got = AssertUnwindSafe(ctx.connect(&mut data))
.catch_unwind() .await;
let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
.unwrap()
.compute_blinded_key(time_period)
.unwrap();
let hs_blind_id = hs_blind_id_key.id();
let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
let hsdesc = HsDesc::parse_decrypt_validate(
test_data::TEST_DATA_2,
&hs_blind_id,
now,
&subcredential,
Some((&pk, &sk)),
)
.unwrap()
.dangerously_assume_timely();
let mglobal = mocks.mglobal.lock().unwrap();
assert_eq!(mglobal.hsdirs_asked.len(), 1);
assert_eq!(
format!("{:?}", mglobal.got_desc),
format!("{:?}", Some(hsdesc))
);
let bounds = data.desc.as_ref().unwrap().bounds();
assert_eq!(bounds.start_bound(), Bound::Unbounded);
let desc_valid_until = humantime::parse_rfc3339("2023-02-11T20:00:00Z").unwrap();
assert_eq!(
bounds.end_bound(),
Bound::Included(desc_valid_until).as_ref()
);
}
}