use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use async_event::Event;
use base64::{prelude::BASE64_STANDARD_NO_PAD, Engine};
use futures_concurrency::future::Race;
use futures_util::{AsyncReadExt, AsyncWriteExt};
use stdcode::StdcodeSerializeExt;
use crate::{bw_token::bw_token_consume, Config};
use anyctx::AnyCtx;
#[tracing::instrument(skip_all)]
pub async fn bw_accounting_client_loop(
ctx: AnyCtx<Config>,
stream: picomux::Stream,
) -> anyhow::Result<()> {
tracing::info!("BW ACCOUNT START!");
let (mut read, mut write) = stream.split();
let bytes_left = Arc::new(AtomicUsize::new(usize::MAX));
let change_event = Arc::new(Event::new());
let read_fut = {
let bytes_left = bytes_left.clone();
let change_event = change_event.clone();
async move {
let mut buf = [0u8; 8];
loop {
read.read_exact(&mut buf).await?;
let bytes = u64::from_be_bytes(buf) as usize;
bytes_left.store(bytes, Ordering::SeqCst);
change_event.notify_one();
}
}
};
let write_fut = {
let change_event = change_event.clone();
let bytes_left = bytes_left.clone();
let ctx = ctx.clone();
async move {
let mut threshold = 1_000_000;
loop {
let remaining = change_event
.wait_until(|| {
let left = bytes_left.load(Ordering::SeqCst);
tracing::trace!(left, threshold, "obtained remote bw");
if left < threshold {
Some(left)
} else {
None
}
})
.await;
if remaining == 0 {
tracing::debug!(threshold, "totally ran out of bw, bumping threshold up");
threshold = (threshold + 1_000_000).min(100_000_000);
}
let (token, sig) = bw_token_consume(&ctx).await?;
let enc = BASE64_STANDARD_NO_PAD.encode((token, sig).stdcode());
write.write_all(enc.as_bytes()).await?;
write.write_all(b"\n").await?;
tracing::debug!(threshold, remaining, "consuming a bandwidth token");
smol::Timer::after(Duration::from_millis(200)).await;
}
}
};
(read_fut, write_fut).race().await
}