use crate::client::circuit::handshake::RelayCryptLayerProtocol;
use crate::ccparams::CongestionControlParams;
use crate::circuit::CircParameters;
use crate::congestion::{CongestionControl, sendme};
use crate::stream::CloseStreamBehavior;
use crate::stream::SEND_WINDOW_INIT;
use crate::stream::StreamMpscReceiver;
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
use crate::stream::flow_ctrl::params::FlowCtrlParameters;
use crate::stream::flow_ctrl::state::{StreamFlowCtrl, StreamRateLimit};
use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
use crate::stream::queue::StreamQueueSender;
use crate::streammap::{
self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut, StreamMap,
};
use crate::util::notify::NotifySender;
use crate::{Error, HopNum, Result};
use postage::watch;
use safelog::sensitive as sv;
use tracing::{trace, warn};
use tor_cell::chancell::BoxedCellBody;
use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
use tor_cell::relaycell::msg::AnyRelayMsg;
use tor_cell::relaycell::{
AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
StreamId, UnparsedRelayMsg,
};
use tor_error::{Bug, internal};
use tor_protover::named;
use std::num::NonZeroU32;
use std::pin::Pin;
use std::result::Result as StdResult;
use std::sync::{Arc, Mutex};
use web_time_compat::Instant;
#[cfg(test)]
use tor_cell::relaycell::msg::SendmeTag;
use cfg_if::cfg_if;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum HopNegotiationType {
None,
HsV3,
Full,
}
#[derive(Clone, Debug)]
pub(crate) struct HopSettings {
pub(crate) ccontrol: CongestionControlParams,
pub(crate) flow_ctrl_params: FlowCtrlParameters,
pub(crate) n_incoming_cells_permitted: Option<u32>,
pub(crate) n_outgoing_cells_permitted: Option<u32>,
relay_crypt_protocol: RelayCryptLayerProtocol,
}
impl HopSettings {
#[allow(clippy::unnecessary_wraps)] pub(crate) fn from_params_and_caps(
hoptype: HopNegotiationType,
params: &CircParameters,
caps: &tor_protover::Protocols,
) -> Result<Self> {
let mut ccontrol = params.ccontrol.clone();
match ccontrol.alg() {
crate::ccparams::Algorithm::FixedWindow(_) => {}
crate::ccparams::Algorithm::Vegas(_) => {
if !caps.supports_named_subver(named::FLOWCTRL_CC) {
ccontrol.use_fallback_alg();
}
}
};
if hoptype == HopNegotiationType::None {
ccontrol.use_fallback_alg();
} else if hoptype == HopNegotiationType::HsV3 {
ccontrol.use_fallback_alg();
}
let ccontrol = ccontrol;
let relay_crypt_protocol = match hoptype {
HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
HopNegotiationType::HsV3 => {
cfg_if! {
if #[cfg(feature = "hs-common")] {
RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
} else {
return Err(
tor_error::internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
);
}
}
}
HopNegotiationType::Full => {
cfg_if! {
if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
#[allow(clippy::overly_complex_bool_expr)]
if ccontrol.alg().compatible_with_cgo()
&& caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
&& caps.supports_named_subver(named::RELAY_CRYPT_CGO)
{
RelayCryptLayerProtocol::Cgo
} else {
RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
}
} else {
RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
}
}
}
};
Ok(Self {
ccontrol,
flow_ctrl_params: params.flow_ctrl.clone(),
relay_crypt_protocol,
n_incoming_cells_permitted: params.n_incoming_cells_permitted,
n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
})
}
pub(crate) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
self.relay_crypt_protocol
}
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn circuit_request_extensions(&self) -> Result<Vec<CircRequestExt>> {
#[allow(unused_mut)]
let mut client_extensions = Vec::new();
#[allow(unused, unused_mut)]
let mut cc_extension_set = false;
if self.ccontrol.is_enabled() {
cfg_if::cfg_if! {
if #[cfg(feature = "flowctl-cc")] {
client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
cc_extension_set = true;
} else {
return Err(
tor_error::internal!(
"Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
)
.into()
);
}
}
}
#[allow(unused_mut)]
let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
#[cfg(feature = "counter-galois-onion")]
if matches!(self.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
if !cc_extension_set {
return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
}
required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
}
if !required_protocol_capabilities.is_empty() {
client_extensions.push(CircRequestExt::SubprotocolRequest(
required_protocol_capabilities.into_iter().collect(),
));
}
Ok(client_extensions)
}
}
#[cfg(test)]
impl std::default::Default for CircParameters {
fn default() -> Self {
Self {
extend_by_ed25519_id: true,
ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
n_incoming_cells_permitted: None,
n_outgoing_cells_permitted: None,
}
}
}
impl CircParameters {
pub fn new(
extend_by_ed25519_id: bool,
ccontrol: CongestionControlParams,
flow_ctrl: FlowCtrlParameters,
) -> Self {
Self {
extend_by_ed25519_id,
ccontrol,
flow_ctrl,
n_incoming_cells_permitted: None,
n_outgoing_cells_permitted: None,
}
}
}
#[derive(educe::Educe)]
#[educe(Debug)]
pub(crate) struct SendRelayCell {
pub(crate) hop: Option<HopNum>,
pub(crate) early: bool,
pub(crate) cell: AnyRelayMsgOuter,
}
pub(crate) struct CircHopInbound {
decoder: RelayCellDecoder,
n_incoming_cells_permitted: Option<NonZeroU32>,
}
pub(crate) struct CircHopOutbound {
ccontrol: Arc<Mutex<CongestionControl>>,
map: Arc<Mutex<StreamMap>>,
relay_format: RelayCellFormat,
flow_ctrl_params: Arc<FlowCtrlParameters>,
n_outgoing_cells_permitted: Option<NonZeroU32>,
}
impl CircHopInbound {
pub(crate) fn new(decoder: RelayCellDecoder, settings: &HopSettings) -> Self {
Self {
decoder,
n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
}
}
pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
self.decoder
.decode(cell)
.map_err(|e| Error::from_bytes_err(e, "relay cell"))
}
pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
.map_err(|_| Error::ExcessInboundCells)
}
}
impl CircHopOutbound {
pub(crate) fn new(
ccontrol: Arc<Mutex<CongestionControl>>,
relay_format: RelayCellFormat,
flow_ctrl_params: Arc<FlowCtrlParameters>,
settings: &HopSettings,
) -> Self {
Self {
ccontrol,
map: Arc::new(Mutex::new(StreamMap::new())),
relay_format,
flow_ctrl_params,
n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn begin_stream(
&mut self,
hop: Option<HopNum>,
message: AnyRelayMsg,
sender: StreamQueueSender,
rx: StreamMpscReceiver<AnyRelayMsg>,
rate_limit_updater: watch::Sender<StreamRateLimit>,
drain_rate_requester: NotifySender<DrainRateRequest>,
cmd_checker: AnyCmdChecker,
) -> Result<(SendRelayCell, StreamId)> {
let flow_ctrl = self.build_flow_ctrl(
Arc::clone(&self.flow_ctrl_params),
rate_limit_updater,
drain_rate_requester,
)?;
let r =
self.map
.lock()
.expect("lock poisoned")
.add_ent(sender, rx, flow_ctrl, cmd_checker)?;
let cell = AnyRelayMsgOuter::new(Some(r), message);
Ok((
SendRelayCell {
hop,
early: false,
cell,
},
r,
))
}
pub(crate) fn close_stream(
&mut self,
circ_id: impl std::fmt::Display,
id: StreamId,
hop: Option<HopNum>,
message: CloseStreamBehavior,
why: streammap::TerminateReason,
expiry: Instant,
) -> Result<Option<SendRelayCell>> {
let should_send_end = self
.map
.lock()
.expect("lock poisoned")
.terminate(id, why, expiry)?;
trace!(
circ_id = %circ_id,
stream_id = %id,
should_send_end = ?should_send_end,
"Ending stream",
);
if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
(should_send_end, message)
{
let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
let cell = SendRelayCell {
hop,
early: false,
cell: end_cell,
};
return Ok(Some(cell));
}
Ok(None)
}
pub(crate) fn maybe_send_xon(
&mut self,
rate: XonKbpsEwma,
id: StreamId,
) -> Result<Option<Xon>> {
if !self
.ccontrol()
.lock()
.expect("poisoned lock")
.uses_xon_xoff()
{
return Ok(None);
}
let mut map = self.map.lock().expect("lock poisoned");
let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
return Ok(None);
};
ent.maybe_send_xon(rate)
}
pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
if !self
.ccontrol()
.lock()
.expect("poisoned lock")
.uses_xon_xoff()
{
return Ok(None);
}
let mut map = self.map.lock().expect("lock poisoned");
let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
return Ok(None);
};
ent.maybe_send_xoff()
}
pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
self.relay_format
}
#[cfg(test)]
pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
self.ccontrol()
.lock()
.expect("poisoned lock")
.send_window_and_expected_tags()
}
pub(crate) fn n_open_streams(&self) -> usize {
self.map.lock().expect("lock poisoned").n_open_streams()
}
pub(crate) fn ccontrol(&self) -> &Arc<Mutex<CongestionControl>> {
&self.ccontrol
}
pub(crate) fn about_to_send(
&mut self,
circ_id: impl std::fmt::Display,
stream_id: StreamId,
msg: &AnyRelayMsg,
) -> Result<()> {
let mut hop_map = self.map.lock().expect("lock poisoned");
let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
warn!(
circ_id = %circ_id,
stream_id = %stream_id,
"sending a relay cell for non-existent or non-open stream!",
);
return Err(Error::CircProto(format!(
"tried to send a relay cell on non-open stream {}",
sv(stream_id),
)));
};
ent.about_to_send(msg)
}
#[cfg(any(feature = "hs-service", feature = "relay"))]
pub(crate) fn add_ent_with_id(
&self,
sink: StreamQueueSender,
rx: StreamMpscReceiver<AnyRelayMsg>,
rate_limit_updater: watch::Sender<StreamRateLimit>,
drain_rate_requester: NotifySender<DrainRateRequest>,
stream_id: StreamId,
cmd_checker: AnyCmdChecker,
) -> Result<()> {
let mut hop_map = self.map.lock().expect("lock poisoned");
hop_map.add_ent_with_id(
sink,
rx,
self.build_flow_ctrl(
Arc::clone(&self.flow_ctrl_params),
rate_limit_updater,
drain_rate_requester,
)?,
stream_id,
cmd_checker,
)?;
Ok(())
}
#[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
fn build_flow_ctrl(
&self,
params: Arc<FlowCtrlParameters>,
rate_limit_updater: watch::Sender<StreamRateLimit>,
drain_rate_requester: NotifySender<DrainRateRequest>,
) -> Result<StreamFlowCtrl> {
if self
.ccontrol()
.lock()
.expect("poisoned lock")
.uses_stream_sendme()
{
let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
Ok(StreamFlowCtrl::new_window(window))
} else {
cfg_if::cfg_if! {
if #[cfg(feature = "flowctl-cc")] {
let use_sidechannel_mitigations = true;
Ok(StreamFlowCtrl::new_xon_xoff(
params,
use_sidechannel_mitigations,
rate_limit_updater,
drain_rate_requester,
))
} else {
drop(params);
drop(rate_limit_updater);
drop(drain_rate_requester);
Err(internal!(
"`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
).into())
}
}
}
}
fn deliver_msg_to_stream(
streamid: StreamId,
ent: &mut OpenStreamEnt,
cell_counts_toward_windows: bool,
msg: UnparsedRelayMsg,
) -> Result<bool> {
use tor_async_utils::SinkTrySend as _;
use tor_async_utils::SinkTrySendError as _;
match msg.cmd() {
RelayCmd::SENDME => {
ent.put_for_incoming_sendme(msg)?;
return Ok(false);
}
RelayCmd::XON => {
ent.handle_incoming_xon(msg)?;
return Ok(false);
}
RelayCmd::XOFF => {
ent.handle_incoming_xoff(msg)?;
return Ok(false);
}
_ => {}
}
let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
if e.is_full() {
cfg_if::cfg_if! {
if #[cfg(not(feature = "flowctl-cc"))] {
return Err(Error::CircProto(format!(
"Stream sink would block; received too many cells on stream ID {}",
sv(streamid),
)));
} else {
return Err(internal!(
"Stream (ID {}) uses an unbounded queue, but apparently it's full?",
sv(streamid),
)
.into());
}
}
}
if e.is_disconnected() && cell_counts_toward_windows {
ent.dropped += 1;
}
}
Ok(message_closes_stream)
}
#[cfg(feature = "hs-service")]
pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
let mut hop_map = self.map.lock().expect("lock poisoned");
hop_map.ending_msg_received(stream_id)?;
Ok(())
}
pub(crate) fn handle_msg<F>(
&self,
possible_proto_violation_err: F,
cell_counts_toward_windows: bool,
streamid: StreamId,
msg: UnparsedRelayMsg,
now: Instant,
) -> Result<Option<UnparsedRelayMsg>>
where
F: FnOnce(StreamId) -> Error,
{
let mut hop_map = self.map.lock().expect("lock poisoned");
match hop_map.get_mut(streamid) {
Some(StreamEntMut::Open(ent)) => {
let message_closes_stream =
Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
if message_closes_stream {
hop_map.ending_msg_received(streamid)?;
}
}
Some(StreamEntMut::EndSent(EndSentStreamEnt { expiry, .. })) if now >= *expiry => {
return Err(possible_proto_violation_err(streamid));
}
#[cfg(feature = "hs-service")]
Some(StreamEntMut::EndSent(_))
if matches!(
msg.cmd(),
RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
) =>
{
hop_map.ending_msg_received(streamid)?;
return Ok(Some(msg));
}
Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
match half_stream.handle_msg(msg)? {
StreamStatus::Open => {}
StreamStatus::Closed => {
hop_map.ending_msg_received(streamid)?;
}
}
}
#[cfg(feature = "hs-service")]
None if matches!(
msg.cmd(),
RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
) =>
{
return Ok(Some(msg));
}
_ => {
return Err(possible_proto_violation_err(streamid));
}
}
Ok(None)
}
pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
&self.map
}
pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
if self.n_open_streams() != 0 {
return Err(internal!("Tried to discard existing open streams?!"));
}
self.map = map;
Ok(())
}
pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
.map_err(|_| Error::ExcessOutboundCells)
}
}
#[inline]
fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
match val {
Some(x) => {
let z = u32::from(*x);
if z == 1 {
Err(())
} else {
*x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
Ok(())
}
}
None => Ok(()),
}
}
fn cvt(limit: u32) -> NonZeroU32 {
limit
.saturating_add(1)
.try_into()
.expect("Adding one left it as zero?")
}