use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
use tor_cell::relaycell::{RelayCmd, RelayMsg, UnparsedRelayMsg};
use crate::congestion::sendme::{
self, StreamRecvWindow, StreamSendWindow, cmd_counts_towards_windows,
};
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, HalfStreamFlowCtrlHooks};
use crate::stream::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
use crate::{Error, Result};
#[cfg(doc)]
use crate::stream::flow_ctrl::state::StreamFlowCtrl;
#[derive(Debug)]
pub(crate) struct WindowFlowCtrl {
window: StreamSendWindow,
}
impl WindowFlowCtrl {
pub(crate) fn new(window: StreamSendWindow) -> Self {
Self { window }
}
}
impl FlowCtrlHooks for WindowFlowCtrl {
fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
!sendme::cmd_counts_towards_windows(msg.cmd()) || self.window.window() > 0
}
fn about_to_send(&mut self, msg: &AnyRelayMsg) -> Result<()> {
if sendme::cmd_counts_towards_windows(msg.cmd()) {
self.window.take().map(|_| ())
} else {
Ok(())
}
}
fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
let _sendme = msg
.decode::<Sendme>()
.map_err(|e| Error::from_bytes_err(e, "failed to decode stream sendme message"))?
.into_msg();
self.window.put()
}
fn handle_incoming_xon(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
let msg = "XON messages not allowed with window flow control";
Err(Error::CircProto(msg.into()))
}
fn handle_incoming_xoff(&mut self, _msg: UnparsedRelayMsg) -> Result<()> {
let msg = "XOFF messages not allowed with window flow control";
Err(Error::CircProto(msg.into()))
}
fn maybe_send_xon(&mut self, _rate: XonKbpsEwma, _buffer_len: usize) -> Result<Option<Xon>> {
let msg = "XON messages cannot be sent with window flow control";
Err(Error::CircProto(msg.into()))
}
fn maybe_send_xoff(&mut self, _buffer_len: usize) -> Result<Option<Xoff>> {
let msg = "XOFF messages cannot be sent with window flow control";
Err(Error::CircProto(msg.into()))
}
fn inbound_queue_max_len(&self) -> usize {
STREAM_READER_BUFFER
}
}
#[derive(Debug)]
pub(crate) struct HalfStreamWindowFlowCtrl {
inner: WindowFlowCtrl,
recv_window: StreamRecvWindow,
}
impl HalfStreamWindowFlowCtrl {
pub(crate) fn new(flow_ctrl: WindowFlowCtrl) -> Self {
Self {
inner: flow_ctrl,
recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
}
}
}
impl HalfStreamFlowCtrlHooks for HalfStreamWindowFlowCtrl {
fn handle_incoming_dropped(&mut self, msg_count: u16) -> Result<()> {
self.recv_window.decrement_n(msg_count)
}
fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
match msg.cmd() {
RelayCmd::SENDME => {
self.inner.put_for_incoming_sendme(msg)?;
Ok(None)
}
RelayCmd::XON => {
self.inner.handle_incoming_xon(msg)?;
Ok(None)
}
RelayCmd::XOFF => {
self.inner.handle_incoming_xoff(msg)?;
Ok(None)
}
cmd if cmd_counts_towards_windows(cmd) => {
let _ = self.recv_window.take()?;
Ok(Some(msg))
}
_ => Ok(Some(msg)),
}
}
}