use async_trait::async_trait;
use std::io;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::instrument;
use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
use crate::transport::TransportImplHelper;
use crate::{Error, event::ChanMgrEventSender};
use safelog::{MaybeSensitive, Sensitive};
use tor_basic_utils::rand_hostname;
use tor_error::internal;
use tor_linkspec::{HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
use tor_proto::channel::ChannelType;
use tor_proto::channel::kist::KistParams;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
use tor_proto::memquota::ChannelAccount;
use tor_rtcompat::SpawnExt;
use tor_rtcompat::{Runtime, TlsProvider, tls::TlsConnector};
#[cfg(feature = "relay")]
use {
futures::{AsyncRead, AsyncWrite},
std::net::IpAddr,
tor_proto::{RelayIdentities, peer::PeerAddr},
tor_rtcompat::{CertifiedConn, StreamOps},
};
pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
where
R: tor_rtcompat::TlsProvider<H::Stream>,
{
runtime: R,
transport: H,
tls_connector: <R as TlsProvider<H::Stream>>::Connector,
#[cfg(feature = "relay")]
tls_acceptor: Option<<R as TlsProvider<H::Stream>>::Acceptor>,
#[cfg(feature = "relay")]
identities: Option<Arc<RelayIdentities>>,
#[cfg(feature = "relay")]
my_addrs: Vec<IpAddr>,
}
impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
where
R: TlsProvider<H::Stream>,
{
pub fn new_client(runtime: R, transport: H) -> Self {
let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
ChanBuilder {
runtime,
transport,
tls_connector,
#[cfg(feature = "relay")]
tls_acceptor: None,
#[cfg(feature = "relay")]
identities: None,
#[cfg(feature = "relay")]
my_addrs: Vec::new(),
}
}
#[cfg(feature = "relay")]
pub fn new_relay(
runtime: R,
transport: H,
identities: Arc<RelayIdentities>,
my_addrs: Vec<IpAddr>,
) -> crate::Result<Self> {
use tor_error::into_internal;
use tor_rtcompat::tls::TlsAcceptorSettings;
let tls_settings = TlsAcceptorSettings::new(identities.tls_key_and_cert().clone())
.map_err(into_internal!("Unable to build TLS acceptor setting"))?;
let tls_acceptor = <R as TlsProvider<H::Stream>>::tls_acceptor(&runtime, tls_settings)
.map_err(into_internal!("Unable to build TLS acceptor"))?;
let mut builder = Self::new_client(runtime, transport);
builder.identities = Some(identities);
builder.tls_acceptor = Some(tls_acceptor);
builder.my_addrs = my_addrs;
Ok(builder)
}
#[cfg(feature = "relay")]
pub fn rebuild_with_identities(&self, identities: Arc<RelayIdentities>) -> crate::Result<Self>
where
H: Clone,
{
Self::new_relay(
self.runtime.clone(),
self.transport.clone(),
identities,
self.my_addrs.clone(),
)
}
fn outbound_chan_type(&self) -> ChannelType {
#[cfg(feature = "relay")]
if self.identities.is_some() {
return ChannelType::RelayInitiator;
}
ChannelType::ClientInitiator
}
}
#[async_trait]
impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
where
R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
H: Send + Sync,
{
#[instrument(skip_all, level = "trace")]
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
reporter: BootstrapReporter,
memquota: ChannelAccount,
) -> crate::Result<Arc<tor_proto::channel::Channel>> {
use tor_rtcompat::SleepProviderExt;
let delay = if target.chan_method().is_direct() {
std::time::Duration::new(5, 0)
} else {
std::time::Duration::new(10, 0)
};
self.runtime
.timeout(delay, self.connect_no_timeout(target, reporter.0, memquota))
.await
.map_err(|_| Error::ChanTimeout {
peer: target.to_logged(),
})?
}
}
#[async_trait]
impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
where
R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
H: Send + Sync,
{
type Stream = H::Stream;
#[cfg(feature = "relay")]
async fn accept_from_transport(
&self,
peer: Sensitive<std::net::SocketAddr>,
stream: Self::Stream,
memquota: ChannelAccount,
) -> crate::Result<Arc<tor_proto::channel::Channel>> {
use tor_linkspec::OwnedChanTargetBuilder;
use tor_proto::relay::MaybeVerifiableRelayResponderChannel;
let target_no_ids = OwnedChanTargetBuilder::default()
.addrs(vec![peer.into_inner()])
.build()
.map_err(|e| internal!("Unable to build chan target from peer sockaddr: {e}"))?;
let peer_addr: MaybeSensitive<PeerAddr> =
MaybeSensitive::sensitive(peer.into_inner().into());
let map_ioe = |ioe, action| Error::Io {
action,
peer: peer_addr.clone(),
source: ioe,
};
let map_proto = |source, target: &OwnedChanTarget, clock_skew| Error::Proto {
source,
peer: target.to_logged(),
clock_skew,
};
let tls = self
.tls_acceptor
.as_ref()
.ok_or(internal!("Accepting connection without TLS acceptor"))?
.negotiate_unvalidated(stream, "ignored")
.await
.map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
let identities = self
.identities
.as_ref()
.ok_or(internal!(
"Unable to build relay channel without identities"
))?
.clone();
let our_cert = tls
.own_certificate()
.map_err(|e| map_ioe(e.into(), "TLS Certs"))?
.ok_or_else(|| Error::Internal(internal!("TLS connection without our certificate")))?
.into_owned();
let builder = tor_proto::RelayChannelBuilder::new();
let unverified = builder
.accept(
Sensitive::new(peer_addr.inner()),
self.my_addrs.clone(),
tls,
self.runtime.clone(),
identities,
memquota,
)
.handshake(|| self.runtime.wallclock())
.await
.map_err(|e| map_proto(e, &target_no_ids, None))?;
let (chan, reactor) = match unverified {
MaybeVerifiableRelayResponderChannel::Verifiable(c) => {
let clock_skew = c.clock_skew();
let now = self.runtime.wallclock();
c.verify(&target_no_ids, &our_cert, Some(now))
.map_err(|e| map_proto(e, &target_no_ids, Some(clock_skew)))?
.finish()
.await
.map_err(|e| map_proto(e, &target_no_ids, Some(clock_skew)))?
}
MaybeVerifiableRelayResponderChannel::NonVerifiable(c) => {
c.finish().map_err(|e| map_proto(e, &target_no_ids, None))?
}
};
self.runtime
.spawn(async {
let _ = reactor.run().await;
})
.map_err(|e| Error::from_spawn("responder channel reactor", e))?;
Ok(chan)
}
}
impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
where
R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
H: Send + Sync,
{
#[instrument(skip_all, level = "trace")]
async fn connect_no_timeout(
&self,
target: &OwnedChanTarget,
event_sender: Arc<Mutex<ChanMgrEventSender>>,
memquota: ChannelAccount,
) -> crate::Result<Arc<tor_proto::channel::Channel>> {
use tor_rtcompat::tls::CertifiedConn;
{
event_sender.lock().expect("Lock poisoned").record_attempt();
}
#[cfg(feature = "relay")]
self.validate_relay_target(target)?;
let (peer_addr, stream) = self.transport.connect(target).await?;
let peer_addr = match self.outbound_chan_type() {
ChannelType::ClientInitiator => MaybeSensitive::sensitive(peer_addr),
ChannelType::RelayInitiator => MaybeSensitive::not_sensitive(peer_addr),
_ => return Err(Error::Internal(internal!("Unknown outbound channel type"))),
};
let map_ioe = |action: &'static str| {
let peer = peer_addr.clone();
move |ioe: io::Error| Error::Io {
action,
peer,
source: ioe.into(),
}
};
let map_proto = |source, target: &OwnedChanTarget, clock_skew| Error::Proto {
source,
peer: target.to_logged(),
clock_skew,
};
{
event_sender
.lock()
.expect("Lock poisoned")
.record_tcp_success();
}
let hostname = rand_hostname::random_hostname(&mut rand::rng());
let tls = self
.tls_connector
.negotiate_unvalidated(stream, hostname.as_str())
.await
.map_err(map_ioe("TLS negotiation"))?;
let peer_cert = tls
.peer_certificate()
.map_err(map_ioe("TLS certs"))?
.ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?
.into_owned();
{
event_sender
.lock()
.expect("Lock poisoned")
.record_tls_finished();
}
let now = self.runtime.wallclock();
let outbound_chan_type = self.outbound_chan_type();
let chan = match outbound_chan_type {
ChannelType::ClientInitiator => {
let mut builder = tor_proto::ClientChannelBuilder::new();
builder.set_declared_method(target.chan_method());
let peer_addr = Sensitive::new(peer_addr.inner());
let unverified = builder
.launch(
tls,
self.runtime.clone(),
memquota,
)
.connect(|| self.runtime.wallclock())
.await
.map_err(|e| Error::from_proto_no_skew(e, target))?;
let clock_skew = unverified.clock_skew();
let (chan, reactor) = unverified
.verify(target, &peer_cert, Some(now))
.map_err(|source| match &source {
tor_proto::Error::HandshakeCertsExpired { .. } => {
event_sender
.lock()
.expect("Lock poisoned")
.record_handshake_done_with_skewed_clock();
map_proto(source, target, Some(clock_skew))
}
_ => Error::from_proto_no_skew(source, target),
})?
.finish(peer_addr)
.await
.map_err(|e| map_proto(e, target, Some(clock_skew)))?;
self.runtime
.spawn(async {
let _ = reactor.run().await;
})
.map_err(|e| Error::from_spawn("client channel reactor", e))?;
chan
}
#[cfg(feature = "relay")]
ChannelType::RelayInitiator => {
if !target.chan_method().is_direct() {
return Err(Error::UnusableTarget(tor_error::bad_api_usage!(
"Relays don't support outbound PT channels"
)));
}
self.build_relay_channel(
tls,
peer_addr.inner(),
target,
&peer_cert,
memquota,
event_sender.clone(),
)
.await?
}
_ => {
return Err(Error::Internal(internal!(
"Unusable channel type for outbound: {outbound_chan_type}",
)));
}
};
event_sender
.lock()
.expect("Lock poisoned")
.record_handshake_done();
Ok(chan)
}
#[cfg(feature = "relay")]
fn validate_relay_target(&self, target: &OwnedChanTarget) -> crate::Result<()> {
use tor_linkspec::HasRelayIds;
let Some(identities) = &self.identities else {
return Ok(());
};
if identities.has_any_relay_id_from(target) {
Err(Error::Proto {
source: tor_proto::Error::ChanProto(
"Refusing to build channel to ourselves".into(),
),
peer: target.clone().into(),
clock_skew: None,
})
} else {
Ok(())
}
}
#[cfg(feature = "relay")]
async fn build_relay_channel<T>(
&self,
tls: T,
peer_addr: PeerAddr,
target: &OwnedChanTarget,
peer_cert: &[u8],
memquota: ChannelAccount,
event_sender: Arc<Mutex<ChanMgrEventSender>>,
) -> crate::Result<Arc<tor_proto::channel::Channel>>
where
T: AsyncRead + AsyncWrite + CertifiedConn + StreamOps + Send + Unpin + 'static,
{
let builder = tor_proto::RelayChannelBuilder::new();
let identities = self
.identities
.as_ref()
.ok_or(internal!(
"Unable to build relay channel without identities"
))?
.clone();
let unverified = builder
.launch(
tls,
self.runtime.clone(),
identities,
self.my_addrs.clone(),
target,
memquota,
)
.connect(|| self.runtime.wallclock())
.await
.map_err(|e| Error::from_proto_no_skew(e, target))?;
let now = self.runtime.wallclock();
let clock_skew = unverified.clock_skew();
let (chan, reactor) = unverified
.verify(target, peer_cert, Some(now))
.map_err(|source| match &source {
tor_proto::Error::HandshakeCertsExpired { .. } => {
event_sender
.lock()
.expect("Lock poisoned")
.record_handshake_done_with_skewed_clock();
Error::Proto {
source,
peer: target.to_logged(),
clock_skew: Some(clock_skew),
}
}
_ => Error::from_proto_no_skew(source, target),
})?
.finish(peer_addr)
.await
.map_err(|source| Error::Proto {
source,
peer: target.to_logged(),
clock_skew: Some(clock_skew),
})?;
self.runtime
.spawn(async {
let _ = reactor.run().await;
})
.map_err(|e| Error::from_spawn("relay channel reactor", e))?;
Ok(chan)
}
}
impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
fn is_canonical(&self) -> bool {
self.is_canonical()
}
fn is_canonical_to_peer(&self) -> bool {
self.is_canonical_to_peer()
}
fn is_usable(&self) -> bool {
!self.is_closing()
}
fn duration_unused(&self) -> Option<Duration> {
self.duration_unused()
}
fn reparameterize(
&self,
updates: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()> {
tor_proto::channel::Channel::reparameterize(self, updates)
}
fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
}
fn engage_padding_activities(&self) {
tor_proto::channel::Channel::engage_padding_activities(self);
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use crate::{
Result,
mgr::{AbstractChannel, AbstractChannelFactory},
};
use futures::StreamExt as _;
use std::net::SocketAddr;
use std::time::{Duration, SystemTime};
use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_llcrypto::pk::rsa::RsaIdentity;
use tor_proto::channel::Channel;
use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
use tor_rtcompat::{NetStreamListener, test_with_one_runtime};
use tor_rtmock::{io::LocalStream, net::MockNetwork};
#[allow(deprecated)] use tor_rtmock::MockSleepRuntime;
#[test]
fn build_ok() -> Result<()> {
use crate::testing::msgs;
let orport: SocketAddr = msgs::ADDR.parse().unwrap();
let ed: Ed25519Identity = msgs::ED_ID.into();
let rsa: RsaIdentity = msgs::RSA_ID.into();
let client_addr = "192.0.2.17".parse().unwrap();
let tls_cert = msgs::X509_CERT.into();
let target = OwnedChanTarget::builder()
.addrs(vec![orport])
.method(ChannelMethod::Direct(vec![orport]))
.ed_identity(ed)
.rsa_identity(rsa)
.build()
.unwrap();
let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
test_with_one_runtime!(|rt| async move {
let network = MockNetwork::new();
let client_rt = network
.builder()
.add_address(client_addr)
.runtime(rt.clone());
#[allow(deprecated)] let client_rt = MockSleepRuntime::new(client_rt);
let relay_rt = network
.builder()
.add_address(orport.ip())
.runtime(rt.clone());
let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
client_rt.jump_to(now);
let transport = crate::transport::DefaultTransport::new(client_rt.clone(), None);
let builder = ChanBuilder::new_client(client_rt, transport);
let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
async {
builder
.build_channel(
&target,
BootstrapReporter::fake(),
ChannelAccount::new_noop(),
)
.await
},
async {
let (mut con, addr) = lis
.incoming()
.next()
.await
.expect("Closed?")
.expect("accept failed");
assert_eq!(client_addr, addr.ip());
crate::testing::answer_channel_req(&mut con)
.await
.expect("answer failed");
Ok(con)
}
);
let chan = r1.unwrap();
assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
assert!(chan.is_usable());
let dur_unused = Channel::duration_unused(&chan);
let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
let dur_unused_3 = Channel::duration_unused(&chan);
assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
r2.unwrap();
Ok(())
})
}
}