pub const CHANNEL_BUFFER_SIZE: usize = 128;
mod circmap;
mod codec;
mod handshake;
pub mod padding;
pub mod params;
mod reactor;
mod unique_id;
pub use crate::channel::params::*;
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, Reactor};
pub use crate::channel::unique_id::UniqId;
use crate::util::err::ChannelClosed;
use crate::util::ts::OptTimestamp;
use crate::{circuit, ClockSkew};
use crate::{Error, Result};
use std::pin::Pin;
use std::sync::{Mutex, MutexGuard};
use std::time::Duration;
use tor_cell::chancell::{msg, msg::PaddingNegotiate, ChanCell, CircId};
use tor_error::internal;
use tor_linkspec::{HasRelayIds, OwnedChanTarget};
use tor_rtcompat::SleepProvider;
mod testing_exports {
#![allow(unreachable_pub)]
pub use super::reactor::CtrlMsg;
pub use crate::circuit::celltypes::CreateResponse;
}
#[cfg(feature = "testing")]
pub use testing_exports::*;
#[cfg(not(feature = "testing"))]
use testing_exports::*;
use asynchronous_codec as futures_codec;
use futures::channel::{mpsc, oneshot};
use futures::io::{AsyncRead, AsyncWrite};
use educe::Educe;
use futures::{Sink, SinkExt};
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tracing::trace;
use crate::channel::unique_id::CircUniqIdContext;
#[cfg(test)]
pub(crate) use codec::CodecError;
pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
type CellFrame<T> = futures_codec::Framed<T, crate::channel::codec::ChannelCodec>;
#[derive(Clone, Debug)]
pub struct Channel {
control: mpsc::UnboundedSender<CtrlMsg>,
cell_tx: mpsc::Sender<ChanCell>,
details: Arc<ChannelDetails>,
}
#[derive(Debug)]
pub(crate) struct ChannelDetails {
unique_id: UniqId,
peer_id: OwnedChanTarget,
closed: AtomicBool,
unused_since: OptTimestamp,
clock_skew: ClockSkew,
opened_at: coarsetime::Instant,
mutable: Mutex<MutableDetails>,
}
#[derive(Debug, Default)]
struct MutableDetails {
padding: PaddingControlState,
}
#[derive(Debug, Educe)]
#[educe(Default)]
enum PaddingControlState {
#[educe(Default)]
UsageDoesNotImplyPadding {
padding_params: ChannelPaddingInstructionsUpdates,
},
PaddingConfigured,
}
use PaddingControlState as PCS;
impl Sink<ChanCell> for Channel {
type Error = Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.cell_tx)
.poll_ready(cx)
.map_err(|_| ChannelClosed.into())
}
fn start_send(self: Pin<&mut Self>, cell: ChanCell) -> Result<()> {
let this = self.get_mut();
if this.details.closed.load(Ordering::SeqCst) {
return Err(ChannelClosed.into());
}
this.check_cell(&cell)?;
{
use msg::ChanMsg::*;
match cell.msg() {
Relay(_) | Padding(_) | VPadding(_) => {} _ => trace!(
"{}: Sending {} for {}",
this.details.unique_id,
cell.msg().cmd(),
cell.circid()
),
}
}
Pin::new(&mut this.cell_tx)
.start_send(cell)
.map_err(|_| ChannelClosed.into())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.cell_tx)
.poll_flush(cx)
.map_err(|_| ChannelClosed.into())
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.cell_tx)
.poll_close(cx)
.map_err(|_| ChannelClosed.into())
}
}
#[derive(Default)]
pub struct ChannelBuilder {
target: Option<tor_linkspec::ChannelMethod>,
}
impl ChannelBuilder {
pub fn new() -> Self {
ChannelBuilder::default()
}
#[deprecated(note = "use set_declared_method instead", since = "0.7.1")]
pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
self.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![target]));
}
pub fn set_declared_method(&mut self, target: tor_linkspec::ChannelMethod) {
self.target = Some(target);
}
pub fn launch<T, S>(self, tls: T, sleep_prov: S) -> OutboundClientHandshake<T, S>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
S: SleepProvider,
{
handshake::OutboundClientHandshake::new(tls, self.target, sleep_prov)
}
}
impl Channel {
fn new<S>(
link_protocol: u16,
sink: BoxedChannelSink,
stream: BoxedChannelStream,
unique_id: UniqId,
peer_id: OwnedChanTarget,
clock_skew: ClockSkew,
sleep_prov: S,
) -> (Self, reactor::Reactor<S>)
where
S: SleepProvider,
{
use circmap::{CircIdRange, CircMap};
let circmap = CircMap::new(CircIdRange::High);
let (control_tx, control_rx) = mpsc::unbounded();
let (cell_tx, cell_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let closed = AtomicBool::new(false);
let unused_since = OptTimestamp::new();
unused_since.update();
let mutable = MutableDetails::default();
let details = ChannelDetails {
unique_id,
peer_id,
closed,
unused_since,
clock_skew,
opened_at: coarsetime::Instant::now(),
mutable: Mutex::new(mutable),
};
let details = Arc::new(details);
let channel = Channel {
control: control_tx,
cell_tx,
details: Arc::clone(&details),
};
let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov, None));
let reactor = Reactor {
control: control_rx,
cells: cell_rx,
input: futures::StreamExt::fuse(stream),
output: sink,
circs: circmap,
circ_unique_id_ctx: CircUniqIdContext::new(),
link_protocol,
details,
padding_timer,
special_outgoing: Default::default(),
};
(channel, reactor)
}
pub fn unique_id(&self) -> UniqId {
self.details.unique_id
}
pub fn target(&self) -> &OwnedChanTarget {
&self.details.peer_id
}
pub fn age(&self) -> Duration {
self.details.opened_at.elapsed().into()
}
pub fn clock_skew(&self) -> ClockSkew {
self.details.clock_skew
}
fn send_control(&self, msg: CtrlMsg) -> StdResult<(), ChannelClosed> {
self.control
.unbounded_send(msg)
.map_err(|_| ChannelClosed)?;
Ok(())
}
fn mutable(&self) -> MutexGuard<MutableDetails> {
self.details
.mutable
.lock()
.expect("channel details poisoned")
}
pub fn engage_padding_activities(&self) {
let mut mutable = self.mutable();
match &mutable.padding {
PCS::UsageDoesNotImplyPadding {
padding_params: params,
} => {
let mut params = params.clone();
if params.padding_negotiate == Some(PaddingNegotiate::start_default()) {
params.padding_negotiate = None;
}
match self.send_control(CtrlMsg::ConfigUpdate(Arc::new(params))) {
Ok(()) => {}
Err(ChannelClosed) => return,
}
mutable.padding = PCS::PaddingConfigured;
}
PCS::PaddingConfigured => {
}
}
drop(mutable); }
pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
let mut mutable = self
.details
.mutable
.lock()
.map_err(|_| internal!("channel details poisoned"))?;
match &mut mutable.padding {
PCS::PaddingConfigured => {
self.send_control(CtrlMsg::ConfigUpdate(params))?;
}
PCS::UsageDoesNotImplyPadding { padding_params } => {
padding_params.combine(¶ms);
}
}
drop(mutable); Ok(())
}
pub fn check_match<T: HasRelayIds + ?Sized>(&self, target: &T) -> Result<()> {
check_id_match_helper(&self.details.peer_id, target)
}
pub fn is_closing(&self) -> bool {
self.details.closed.load(Ordering::SeqCst)
}
pub fn duration_unused(&self) -> Option<std::time::Duration> {
self.details
.unused_since
.time_since_update()
.map(Into::into)
}
fn check_cell(&self, cell: &ChanCell) -> Result<()> {
use msg::ChanMsg::*;
let msg = cell.msg();
match msg {
Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
"Can't send {} cell on client channel",
msg.cmd()
))),
Certs(_) | Versions(_) | Authenticate(_) | Authorize(_) | AuthChallenge(_)
| Netinfo(_) => Err(Error::from(internal!(
"Can't send {} cell after handshake is done",
msg.cmd()
))),
_ => Ok(()),
}
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result<bool> {
Ok(match Pin::new(&mut self.cell_tx).poll_ready(cx) {
Poll::Ready(Ok(_)) => true,
Poll::Ready(Err(_)) => return Err(Error::CircuitClosed),
Poll::Pending => false,
})
}
pub async fn send_cell(&mut self, cell: ChanCell) -> Result<()> {
self.send(cell).await?;
Ok(())
}
pub async fn new_circ(
&self,
) -> Result<(circuit::PendingClientCirc, circuit::reactor::Reactor)> {
if self.is_closing() {
return Err(ChannelClosed.into());
}
let (sender, receiver) = mpsc::channel(128);
let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
let (tx, rx) = oneshot::channel();
self.send_control(CtrlMsg::AllocateCircuit {
created_sender: createdsender,
sender,
tx,
})?;
let (id, circ_unique_id) = rx.await.map_err(|_| ChannelClosed)??;
trace!("{}: Allocated CircId {}", circ_unique_id, id);
Ok(circuit::PendingClientCirc::new(
id,
self.clone(),
createdreceiver,
receiver,
circ_unique_id,
))
}
pub fn terminate(&self) {
let _ = self.send_control(CtrlMsg::Shutdown);
}
pub fn close_circuit(&self, circid: CircId) -> Result<()> {
self.send_control(CtrlMsg::CloseCircuit(circid))?;
Ok(())
}
#[cfg(feature = "testing")]
pub fn new_fake() -> (Channel, mpsc::UnboundedReceiver<CtrlMsg>) {
let (control, control_recv) = mpsc::unbounded();
let details = fake_channel_details();
let channel = Channel {
control,
cell_tx: mpsc::channel(CHANNEL_BUFFER_SIZE).0,
details,
};
(channel, control_recv)
}
}
fn check_id_match_helper<T, U>(my_ident: &T, wanted_ident: &U) -> Result<()>
where
T: HasRelayIds + ?Sized,
U: HasRelayIds + ?Sized,
{
for desired in wanted_ident.identities() {
let id_type = desired.id_type();
match my_ident.identity(id_type) {
Some(actual) if actual == desired => {}
Some(actual) => {
return Err(Error::ChanMismatch(format!(
"Identity {} does not match target {}",
actual, desired
)));
}
None => {
return Err(Error::ChanMismatch(format!(
"Peer does not have {} identity",
id_type
)))
}
}
}
Ok(())
}
impl HasRelayIds for Channel {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
self.details.peer_id.identity(key_type)
}
}
#[cfg(any(test, feature = "testing"))]
fn fake_channel_details() -> Arc<ChannelDetails> {
let unique_id = UniqId::new();
let unused_since = OptTimestamp::new();
let peer_id = OwnedChanTarget::builder()
.ed_identity([6_u8; 32].into())
.rsa_identity([10_u8; 20].into())
.build()
.expect("Couldn't construct peer id");
Arc::new(ChannelDetails {
unique_id,
peer_id,
closed: AtomicBool::new(false),
unused_since,
clock_skew: ClockSkew::None,
opened_at: coarsetime::Instant::now(),
mutable: Default::default(),
})
}
#[cfg(test)]
pub(crate) mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::channel::codec::test::MsgBuf;
pub(crate) use crate::channel::reactor::test::new_reactor;
use tor_cell::chancell::{msg, ChanCell};
use tor_rtcompat::PreferredRuntime;
pub(crate) fn fake_channel(details: Arc<ChannelDetails>) -> Channel {
Channel {
control: mpsc::unbounded().0,
cell_tx: mpsc::channel(CHANNEL_BUFFER_SIZE).0,
details,
}
}
#[test]
fn send_bad() {
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
use std::error::Error;
let chan = fake_channel(fake_channel_details());
let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into());
let e = chan.check_cell(&cell);
assert!(e.is_err());
assert!(format!("{}", e.unwrap_err().source().unwrap())
.contains("Can't send CREATED2 cell on client channel"));
let cell = ChanCell::new(0.into(), msg::Certs::new_empty().into());
let e = chan.check_cell(&cell);
assert!(e.is_err());
assert!(format!("{}", e.unwrap_err().source().unwrap())
.contains("Can't send CERTS cell after handshake is done"));
let cell = ChanCell::new(5.into(), msg::Create2::new(2, &b"abc"[..]).into());
let e = chan.check_cell(&cell);
assert!(e.is_ok());
});
}
#[test]
fn chanbuilder() {
let rt = PreferredRuntime::create().unwrap();
let mut builder = ChannelBuilder::default();
builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec!["127.0.0.1:9001"
.parse()
.unwrap()]));
let tls = MsgBuf::new(&b""[..]);
let _outbound = builder.launch(tls, rt);
}
#[test]
fn check_match() {
let chan = fake_channel(fake_channel_details());
let t1 = OwnedChanTarget::builder()
.ed_identity([6; 32].into())
.rsa_identity([10; 20].into())
.build()
.unwrap();
let t2 = OwnedChanTarget::builder()
.ed_identity([1; 32].into())
.rsa_identity([3; 20].into())
.build()
.unwrap();
let t3 = OwnedChanTarget::builder()
.ed_identity([3; 32].into())
.rsa_identity([2; 20].into())
.build()
.unwrap();
assert!(chan.check_match(&t1).is_ok());
assert!(chan.check_match(&t2).is_err());
assert!(chan.check_match(&t3).is_err());
}
#[test]
fn unique_id() {
let ch1 = fake_channel(fake_channel_details());
let ch2 = fake_channel(fake_channel_details());
assert_ne!(ch1.unique_id(), ch2.unique_id());
}
#[test]
fn duration_unused_at() {
let details = fake_channel_details();
let ch = fake_channel(Arc::clone(&details));
details.unused_since.update();
assert!(ch.duration_unused().is_some());
}
}