pub(crate) mod cmdcheck;
pub(crate) mod flow_ctrl;
pub(crate) mod raw;
#[cfg(any(feature = "hs-service", feature = "relay"))]
pub(crate) mod incoming;
pub(crate) mod queue;
use futures::SinkExt as _;
use oneshot_fused_workaround as oneshot;
use postage::watch;
use safelog::sensitive;
use tor_async_utils::SinkCloseChannel as _;
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
use tor_cell::relaycell::msg::{AnyRelayMsg, End};
use tor_cell::relaycell::{RelayCellFormat, StreamId, UnparsedRelayMsg};
use tor_memquota::mq_queue::{self, MpscSpec};
use flow_ctrl::state::StreamRateLimit;
use crate::memquota::StreamAccount;
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
use crate::stream::raw::StreamReceiver;
use crate::{ClientTunnel, Error, HopLocation, Result};
use std::pin::Pin;
use std::sync::Arc;
pub(crate) const SEND_WINDOW_INIT: u16 = 500;
pub(crate) const RECV_WINDOW_INIT: u16 = 500;
pub(crate) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
#[derive(Clone, Debug)]
pub(crate) enum CloseStreamBehavior {
SendNothing,
SendEnd(End),
}
impl Default for CloseStreamBehavior {
fn default() -> Self {
Self::SendEnd(End::new_misc())
}
}
#[derive(Debug)]
pub(crate) struct StreamComponents {
pub(crate) stream_receiver: StreamReceiver,
pub(crate) target: StreamTarget,
pub(crate) memquota: StreamAccount,
pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
}
#[derive(Clone, Debug)]
pub(crate) struct StreamTarget {
pub(crate) hop: Option<HopLocation>,
pub(crate) stream_id: StreamId,
pub(crate) relay_cell_format: RelayCellFormat,
pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
pub(crate) tx: StreamMpscSender<AnyRelayMsg>,
pub(crate) tunnel: Tunnel,
}
#[derive(Debug, Clone, derive_more::From)]
pub(crate) enum Tunnel {
Client(Arc<ClientTunnel>),
#[cfg(feature = "relay")]
Relay(Arc<crate::relay::RelayCirc>),
}
impl StreamTarget {
pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
Ok(())
}
#[cfg(any(feature = "hs-service", feature = "relay"))]
pub(crate) fn close_pending(
&self,
message: crate::stream::CloseStreamBehavior,
) -> Result<oneshot::Receiver<Result<()>>> {
match &self.tunnel {
Tunnel::Client(t) => {
cfg_if::cfg_if! {
if #[cfg(feature = "hs-service")] {
t.close_pending(self.stream_id, self.hop, message)
} else {
Err(tor_error::internal!("close_pending() called on client stream?!").into())
}
}
}
#[cfg(feature = "relay")]
Tunnel::Relay(t) => t.close_pending(self.stream_id, message),
}
}
pub(crate) fn close(&mut self) {
Pin::new(&mut self.tx).close_channel();
}
pub(crate) fn protocol_error(&mut self) {
match &self.tunnel {
Tunnel::Client(t) => t.terminate(),
#[cfg(feature = "relay")]
Tunnel::Relay(t) => t.terminate(),
}
}
pub(crate) fn send_sendme(&mut self) -> Result<()> {
match &self.tunnel {
Tunnel::Client(t) => t.send_sendme(self.stream_id, self.hop),
#[cfg(feature = "relay")]
Tunnel::Relay(t) => t.send_sendme(self.stream_id),
}
}
pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
match &mut self.tunnel {
Tunnel::Client(t) => t.drain_rate_update(self.stream_id, self.hop, rate),
#[cfg(feature = "relay")]
Tunnel::Relay(t) => t.drain_rate_update(self.stream_id, rate),
}
}
#[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
pub(crate) fn tunnel(&self) -> &Tunnel {
&self.tunnel
}
pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
self.relay_cell_format
}
pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
&self.rate_limit_stream
}
}
pub(crate) fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
let cmd = msg.cmd();
let streamid = msg.stream_id();
if !cmd.accepts_streamid_val(streamid) {
return Err(Error::CircProto(format!(
"Invalid stream ID {} for relay command {}",
sensitive(StreamId::get_or_zero(streamid)),
msg.cmd()
)));
}
Ok(streamid)
}