use std::net::SocketAddr;
use std::time::Duration;
use flume::TrySendError;
use parking_lot::Mutex;
use tokio_tungstenite::tungstenite::Message;
use crate::throttler::Throttler;
static THROTTLER: Mutex<Throttler> = Mutex::new(Throttler::new(Duration::from_secs(30)));
#[derive(Debug, Clone, Copy)]
pub(crate) enum SendLossyResult {
Sent,
#[allow(dead_code)]
SentLossy(usize),
ExhaustedRetries,
}
pub(crate) fn send_lossy(
client_addr: &SocketAddr,
tx: &flume::Sender<Message>,
rx: &flume::Receiver<Message>,
mut message: Message,
retries: usize,
) -> SendLossyResult {
let mut dropped = 0;
loop {
match (dropped, tx.try_send(message)) {
(0, Ok(_)) => return SendLossyResult::Sent,
(_, Ok(_)) => {
if THROTTLER.lock().try_acquire() {
tracing::info!("outbox for client {client_addr} full");
}
return SendLossyResult::SentLossy(dropped);
}
(_, Err(TrySendError::Disconnected(_))) => unreachable!("we're holding rx"),
(_, Err(TrySendError::Full(rejected))) => {
if dropped >= retries {
if THROTTLER.lock().try_acquire() {
tracing::info!("outbox for client {client_addr} full");
}
return SendLossyResult::ExhaustedRetries;
}
message = rejected;
let _ = rx.try_recv();
dropped += 1
}
}
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use tracing_test::traced_test;
use super::*;
fn make_message(id: usize) -> Message {
Message::Text(format!("{id}").into())
}
fn parse_message(msg: Message) -> usize {
match msg {
Message::Text(text) => text.parse().expect("id"),
_ => unreachable!(),
}
}
#[traced_test]
#[test]
fn test_send_lossy() {
const BACKLOG: usize = 4;
const TOTAL: usize = 10;
let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 1234);
let (tx, rx) = flume::bounded(BACKLOG);
for i in 0..BACKLOG {
assert_matches!(
send_lossy(&addr, &tx, &rx, make_message(i), 0),
SendLossyResult::Sent
);
}
for i in BACKLOG..TOTAL {
assert_matches!(
send_lossy(&addr, &tx, &rx, make_message(TOTAL + i), 0),
SendLossyResult::ExhaustedRetries
);
assert_matches!(
send_lossy(&addr, &tx, &rx, make_message(i), 1),
SendLossyResult::SentLossy(1)
);
}
let mut received: Vec<usize> = vec![];
while let Ok(msg) = rx.try_recv() {
received.push(parse_message(msg));
}
assert_eq!(received, ((TOTAL - BACKLOG)..TOTAL).collect::<Vec<_>>());
}
}