use std::sync::Arc;
use postage::watch;
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
use tor_cell::relaycell::msg::AnyRelayMsg;
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
use super::params::FlowCtrlParameters;
use super::window::state::WindowFlowCtrl;
use super::xon_xoff::reader::DrainRateRequest;
#[cfg(feature = "flowctl-cc")]
use super::xon_xoff::state::XonXoffFlowCtrl;
use crate::Result;
use crate::congestion::sendme;
use crate::util::notify::NotifySender;
#[enum_dispatch::enum_dispatch]
#[derive(Debug)]
enum StreamFlowCtrlEnum {
WindowBased(WindowFlowCtrl),
#[cfg(feature = "flowctl-cc")]
XonXoffBased(XonXoffFlowCtrl),
}
#[derive(Debug)]
pub(crate) struct StreamFlowCtrl {
e: StreamFlowCtrlEnum,
}
impl StreamFlowCtrl {
pub(crate) fn new_window(window: sendme::StreamSendWindow) -> Self {
Self {
e: StreamFlowCtrlEnum::WindowBased(WindowFlowCtrl::new(window)),
}
}
#[cfg(feature = "flowctl-cc")]
pub(crate) fn new_xon_xoff(
params: Arc<FlowCtrlParameters>,
use_sidechannel_mitigations: bool,
rate_limit_updater: watch::Sender<StreamRateLimit>,
drain_rate_requester: NotifySender<DrainRateRequest>,
) -> Self {
Self {
e: StreamFlowCtrlEnum::XonXoffBased(XonXoffFlowCtrl::new(
params,
use_sidechannel_mitigations,
rate_limit_updater,
drain_rate_requester,
)),
}
}
}
impl FlowCtrlHooks for StreamFlowCtrl {
fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
self.e.can_send(msg)
}
fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
self.e.about_to_send(msg)
}
fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
self.e.put_for_incoming_sendme(msg)
}
fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
self.e.handle_incoming_xon(msg)
}
fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
self.e.handle_incoming_xoff(msg)
}
fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>> {
self.e.maybe_send_xon(rate, buffer_len)
}
fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>> {
self.e.maybe_send_xoff(buffer_len)
}
}
#[enum_dispatch::enum_dispatch(StreamFlowCtrlEnum)]
pub(crate) trait FlowCtrlHooks {
fn can_send<M: RelayMsg>(&self, msg: &M) -> bool;
fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()>;
fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()>;
fn maybe_send_xon(&mut self, rate: XonKbpsEwma, buffer_len: usize) -> Result<Option<Xon>>;
fn maybe_send_xoff(&mut self, buffer_len: usize) -> Result<Option<Xoff>>;
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct StreamRateLimit {
rate: u64,
}
impl StreamRateLimit {
pub(crate) const MAX: Self = Self::new_bytes_per_sec(u64::MAX);
pub(crate) const ZERO: Self = Self::new_bytes_per_sec(0);
pub(crate) const fn new_bytes_per_sec(rate: u64) -> Self {
Self { rate }
}
pub(crate) const fn bytes_per_sec(&self) -> u64 {
self.rate
}
}
impl std::fmt::Display for StreamRateLimit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} bytes/s", self.rate)
}
}