use super::circmap::{CircEnt, CircMap};
use crate::circuit::halfcirc::HalfCirc;
use crate::util::err::{ChannelClosed, ReactorError};
use crate::{Error, Result};
use tor_basic_utils::futures::SinkExt as _;
use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId};
use tor_rtcompat::SleepProvider;
use futures::channel::{mpsc, oneshot};
use futures::sink::SinkExt;
use futures::stream::Stream;
use futures::Sink;
use futures::StreamExt as _;
use futures::{select, select_biased};
use tor_error::internal;
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::channel::{codec::CodecError, padding, params::*, unique_id, ChannelDetails};
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
use tracing::{debug, trace};
pub(super) type BoxedChannelStream =
Box<dyn Stream<Item = std::result::Result<ChanCell, CodecError>> + Send + Unpin + 'static>;
pub(super) type BoxedChannelSink =
Box<dyn Sink<ChanCell, Error = CodecError> + Send + Unpin + 'static>;
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
fn codec_err_to_chan(err: CodecError) -> Error {
match err {
CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
CodecError::EncCell(err) => Error::from_cell_enc(err, "channel cell"),
CodecError::DecCell(err) => Error::from_cell_dec(err, "channel cell"),
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
#[derive(Debug)]
#[allow(unreachable_pub)] #[allow(clippy::exhaustive_enums)]
pub enum CtrlMsg {
Shutdown,
CloseCircuit(CircId),
AllocateCircuit {
created_sender: oneshot::Sender<CreateResponse>,
sender: mpsc::Sender<ClientCircChanMsg>,
tx: ReactorResultChannel<(CircId, crate::circuit::UniqId)>,
},
ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
}
#[must_use = "If you don't call run() on a reactor, the channel won't work."]
pub struct Reactor<S: SleepProvider> {
pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
pub(super) cells: mpsc::Receiver<ChanCell>,
pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
pub(super) output: BoxedChannelSink,
pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
pub(super) special_outgoing: SpecialOutgoing,
pub(super) circs: CircMap,
pub(super) details: Arc<ChannelDetails>,
pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
#[allow(dead_code)] pub(super) link_protocol: u16,
}
#[derive(Default, Debug, Clone)]
pub(super) struct SpecialOutgoing {
pub(super) padding_negotiate: Option<PaddingNegotiate>,
}
impl SpecialOutgoing {
#[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
pub(super) fn next(&mut self) -> Option<ChanCell> {
if let Some(p) = self.padding_negotiate.take() {
return Some(p.into());
}
None
}
}
impl<S: SleepProvider> fmt::Display for Reactor<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.details.unique_id, f)
}
}
impl<S: SleepProvider> Reactor<S> {
pub async fn run(mut self) -> Result<()> {
if self.details.closed.load(Ordering::SeqCst) {
return Err(ChannelClosed.into());
}
debug!("{}: Running reactor", &self);
let result: Result<()> = loop {
match self.run_once().await {
Ok(()) => (),
Err(ReactorError::Shutdown) => break Ok(()),
Err(ReactorError::Err(e)) => break Err(e),
}
};
debug!("{}: Reactor stopped: {:?}", &self, result);
self.details.closed.store(true, Ordering::SeqCst);
result
}
async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
select! {
ret = self.output.prepare_send_from(async {
if let Some(l) = self.special_outgoing.next() {
self.padding_timer.as_mut().note_cell_sent();
return Some(l)
}
select_biased! {
n = self.cells.next() => {
self.padding_timer.as_mut().note_cell_sent();
n
},
p = self.padding_timer.as_mut().next() => {
Some(p.into())
},
}
}) => {
let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
let msg = msg.ok_or(ReactorError::Shutdown)?;
sendable.send(msg).map_err(codec_err_to_chan)?;
}
ret = self.control.next() => {
let ctrl = match ret {
None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
Some(x) => x,
};
self.handle_control(ctrl).await?;
}
ret = self.input.next() => {
let item = ret
.ok_or(ReactorError::Shutdown)?
.map_err(codec_err_to_chan)?;
crate::note_incoming_traffic();
self.handle_cell(item).await?;
}
}
Ok(()) }
async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
trace!("{}: reactor received {:?}", &self, msg);
match msg {
CtrlMsg::Shutdown => panic!(), CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
CtrlMsg::AllocateCircuit {
created_sender,
sender,
tx,
} => {
let mut rng = rand::thread_rng();
let my_unique_id = self.details.unique_id;
let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
let ret: Result<_> = self
.circs
.add_ent(&mut rng, created_sender, sender)
.map(|id| (id, circ_unique_id));
let _ = tx.send(ret); self.update_disused_since();
}
CtrlMsg::ConfigUpdate(updates) => {
if self.link_protocol == 4 {
return Ok(());
}
let ChannelPaddingInstructionsUpdates {
padding_enable,
padding_parameters,
padding_negotiate,
} = &*updates;
if let Some(parameters) = padding_parameters {
self.padding_timer.as_mut().reconfigure(parameters);
}
if let Some(enable) = padding_enable {
if *enable {
self.padding_timer.as_mut().enable();
} else {
self.padding_timer.as_mut().disable();
}
}
if let Some(padding_negotiate) = padding_negotiate {
self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
}
}
}
Ok(())
}
async fn handle_cell(&mut self, cell: ChanCell) -> Result<()> {
let (circid, msg) = cell.into_circid_and_msg();
use ChanMsg::*;
match msg {
Relay(_) | Padding(_) | VPadding(_) => {} _ => trace!("{}: received {} for {}", &self, msg.cmd(), circid),
}
match msg {
Create(_) | CreateFast(_) | Create2(_) | RelayEarly(_) | PaddingNegotiate(_) => Err(
Error::ChanProto(format!("{} cell on client channel", msg.cmd())),
),
Created(_) => Err(Error::ChanProto(format!(
"{} cell received, but we never send CREATEs",
msg.cmd()
))),
Versions(_) | Certs(_) | Authorize(_) | Authenticate(_) | AuthChallenge(_)
| Netinfo(_) => Err(Error::ChanProto(format!(
"{} cell after handshake is done",
msg.cmd()
))),
Relay(_) => self.deliver_relay(circid, msg).await,
Destroy(_) => self.deliver_destroy(circid, msg).await,
CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg).await,
Padding(_) | VPadding(_) => Ok(()),
Unrecognized(_) => Ok(()),
_ => Ok(()),
}
}
async fn deliver_relay(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
let mut ent = self
.circs
.get_mut(circid)
.ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
match &mut *ent {
CircEnt::Open(s) => {
if s.send(msg.try_into()?).await.is_err() {
drop(ent);
self.outbound_destroy_circ(circid).await?;
}
Ok(())
}
CircEnt::Opening(_, _) => Err(Error::ChanProto(
"Relay cell on pending circuit before CREATED* received".into(),
)),
CircEnt::DestroySent(hs) => hs.receive_cell(),
}
}
async fn deliver_created(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
let target = self.circs.advance_from_opening(circid)?;
let created = msg.try_into()?;
target.send(created).map_err(|_| {
Error::from(internal!(
"Circuit queue rejected created message. Is it closing?"
))
})
}
async fn deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
let entry = self.circs.remove(circid);
self.update_disused_since();
match entry {
Some(CircEnt::Opening(oneshot, _)) => {
trace!("{}: Passing destroy to pending circuit {}", &self, circid);
oneshot
.send(msg.try_into()?)
.map_err(|_| {
internal!("pending circuit wasn't interested in destroy cell?").into()
})
}
Some(CircEnt::Open(mut sink)) => {
trace!("{}: Passing destroy to open circuit {}", &self, circid);
sink.send(msg.try_into()?)
.await
.map_err(|_| {
internal!("open circuit wasn't interested in destroy cell?").into()
})
}
Some(CircEnt::DestroySent(_)) => Ok(()),
None => {
trace!("{}: Destroy for nonexistent circuit {}", &self, circid);
Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
}
}
}
async fn send_cell(&mut self, cell: ChanCell) -> Result<()> {
self.output.send(cell).await.map_err(codec_err_to_chan)?;
Ok(())
}
async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
trace!("{}: Circuit {} is gone; sending DESTROY", &self, id);
self.circs.destroy_sent(id, HalfCirc::new(3000));
self.update_disused_since();
let destroy = Destroy::new(DestroyReason::NONE).into();
let cell = ChanCell::new(id, destroy);
self.send_cell(cell).await?;
Ok(())
}
fn update_disused_since(&self) {
if self.circs.open_ent_count() == 0 {
self.details.unused_since.update_if_none();
} else {
self.details.unused_since.clear();
}
}
}
#[cfg(test)]
pub(crate) mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::channel::UniqId;
use crate::circuit::CircParameters;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use tor_linkspec::OwnedChanTarget;
use tor_rtcompat::Runtime;
type CodecResult = std::result::Result<ChanCell, CodecError>;
pub(crate) fn new_reactor<R: Runtime>(
runtime: R,
) -> (
crate::channel::Channel,
Reactor<R>,
mpsc::Receiver<ChanCell>,
mpsc::Sender<CodecResult>,
) {
let link_protocol = 4;
let (send1, recv1) = mpsc::channel(32);
let (send2, recv2) = mpsc::channel(32);
let unique_id = UniqId::new();
let dummy_target = OwnedChanTarget::builder()
.ed_identity([6; 32].into())
.rsa_identity([10; 20].into())
.build()
.unwrap();
let send1 = send1.sink_map_err(|e| {
trace!("got sink error: {}", e);
CodecError::DecCell(tor_cell::Error::ChanProto("dummy message".into()))
});
let (chan, reactor) = crate::channel::Channel::new(
link_protocol,
Box::new(send1),
Box::new(recv2),
unique_id,
dummy_target,
crate::ClockSkew::None,
runtime,
);
(chan, reactor, recv1, send2)
}
#[test]
fn shutdown() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
let (chan, mut reactor, _output, _input) = new_reactor(rt);
chan.terminate();
let r = reactor.run_once().await;
assert!(matches!(r, Err(ReactorError::Shutdown)));
});
}
#[test]
fn shutdown2() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
use futures::future::FutureExt;
use futures::join;
let (chan, reactor, _output, _input) = new_reactor(rt);
let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
let rr = run_reactor.clone();
let exit_then_check = async {
assert!(rr.peek().is_none());
chan.terminate();
};
let (rr_s, _) = join!(run_reactor, exit_then_check);
assert!(rr_s);
});
}
#[test]
fn new_circ_closed() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
assert!(chan.duration_unused().is_some());
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
let (pending, circr) = ret.unwrap();
rt.spawn(async {
let _ignore = circr.run().await;
})
.unwrap();
assert!(reac.is_ok());
let id = pending.peek_circid();
let ent = reactor.circs.get_mut(id);
assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
assert!(chan.duration_unused().is_none());
drop(pending);
reactor.run_once().await.unwrap();
let ent = reactor.circs.get_mut(id);
assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
let cell = output.next().await.unwrap();
assert_eq!(cell.circid(), id);
assert!(matches!(cell.msg(), ChanMsg::Destroy(_)));
assert!(chan.duration_unused().is_some()); });
}
#[test]
#[ignore] fn new_circ_create_failure() {
use std::time::Duration;
use tor_rtcompat::SleepProvider;
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
use tor_cell::chancell::msg;
let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
let (pending, circr) = ret.unwrap();
rt.spawn(async {
let _ignore = circr.run().await;
})
.unwrap();
assert!(reac.is_ok());
let circparams = CircParameters::default();
let id = pending.peek_circid();
let ent = reactor.circs.get_mut(id);
assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
#[allow(clippy::clone_on_copy)]
let rtc = rt.clone();
let send_response = async {
rtc.sleep(Duration::from_millis(100)).await;
trace!("sending createdfast");
let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into());
input.send(Ok(created_cell)).await.unwrap();
reactor.run_once().await.unwrap();
};
let (circ, _) =
futures::join!(pending.create_firsthop_fast(&circparams), send_response);
assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
reactor.run_once().await.unwrap();
let cell_sent = output.next().await.unwrap();
assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_)));
let ent = reactor.circs.get_mut(id);
assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
});
}
#[test]
fn bad_cells() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
use tor_cell::chancell::msg;
let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
let create_cell = msg::Create2::new(4, *b"hihi").into();
input
.send(Ok(ChanCell::new(9.into(), create_cell)))
.await
.unwrap();
let created2_cell = msg::Created2::new(*b"hihi").into();
input
.send(Ok(ChanCell::new(7.into(), created2_cell)))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: CREATE2 cell on client channel"
);
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
);
let relay_cell = msg::Relay::new(b"abc").into();
input
.send(Ok(ChanCell::new(4.into(), relay_cell)))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: Relay cell on nonexistent circuit"
);
let versions_cell = msg::Versions::new([3]).unwrap().into();
input
.send(Ok(ChanCell::new(0.into(), versions_cell)))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: VERSIONS cell after handshake is done"
);
let created_cell = msg::Created::new(&b"xyzzy"[..]).into();
input
.send(Ok(ChanCell::new(25.into(), created_cell)))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: CREATED cell received, but we never send CREATEs"
);
});
}
#[test]
fn deliver_relay() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
use crate::circuit::celltypes::ClientCircChanMsg;
use futures::channel::oneshot;
use tor_cell::chancell::msg;
let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
let (_circ_stream_7, mut circ_stream_13) = {
let (snd1, _rcv1) = oneshot::channel();
let (snd2, rcv2) = mpsc::channel(64);
reactor
.circs
.put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
let (snd3, rcv3) = mpsc::channel(64);
reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
reactor
.circs
.put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
(rcv2, rcv3)
};
let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into();
input
.send(Ok(ChanCell::new(13.into(), relaycell.clone())))
.await
.unwrap();
reactor.run_once().await.unwrap();
let got = circ_stream_13.next().await.unwrap();
assert!(matches!(got, ClientCircChanMsg::Relay(_)));
input
.send(Ok(ChanCell::new(7.into(), relaycell.clone())))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: Relay cell on pending circuit before CREATED* received"
);
input
.send(Ok(ChanCell::new(101.into(), relaycell.clone())))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: Relay cell on nonexistent circuit"
);
for _ in 0..25 {
input
.send(Ok(ChanCell::new(23.into(), relaycell.clone())))
.await
.unwrap();
reactor.run_once().await.unwrap(); }
input
.send(Ok(ChanCell::new(23.into(), relaycell.clone())))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: Too many cells received on destroyed circuit"
);
});
}
#[test]
fn deliver_destroy() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
use crate::circuit::celltypes::*;
use futures::channel::oneshot;
use tor_cell::chancell::msg;
let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
let (circ_oneshot_7, mut circ_stream_13) = {
let (snd1, rcv1) = oneshot::channel();
let (snd2, _rcv2) = mpsc::channel(64);
reactor
.circs
.put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
let (snd3, rcv3) = mpsc::channel(64);
reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
reactor
.circs
.put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
(rcv1, rcv3)
};
let destroycell: ChanMsg = msg::Destroy::new(0.into()).into();
input
.send(Ok(ChanCell::new(7.into(), destroycell.clone())))
.await
.unwrap();
reactor.run_once().await.unwrap();
let msg = circ_oneshot_7.await;
assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
input
.send(Ok(ChanCell::new(13.into(), destroycell.clone())))
.await
.unwrap();
reactor.run_once().await.unwrap();
let msg = circ_stream_13.next().await.unwrap();
assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
input
.send(Ok(ChanCell::new(23.into(), destroycell.clone())))
.await
.unwrap();
reactor.run_once().await.unwrap();
input
.send(Ok(ChanCell::new(101.into(), destroycell.clone())))
.await
.unwrap();
let e = reactor.run_once().await.unwrap_err().unwrap_err();
assert_eq!(
format!("{}", e),
"Channel protocol violation: Destroy for nonexistent circuit"
);
});
}
}