use std::task::Poll;
use crate::{Error, Result};
#[derive(Default)]
struct State {
bitrate: Option<u64>,
abort: Option<Error>,
}
#[derive(Clone)]
pub struct BandwidthProducer {
state: conducer::Producer<State>,
}
impl BandwidthProducer {
pub fn new() -> Self {
Self {
state: conducer::Producer::default(),
}
}
pub fn set(&self, bitrate: Option<u64>) -> Result<()> {
let mut state = self.modify()?;
if state.bitrate != bitrate {
state.bitrate = bitrate;
}
Ok(())
}
pub fn consume(&self) -> BandwidthConsumer {
BandwidthConsumer {
state: self.state.consume(),
last: None,
}
}
pub fn close(&self, err: Error) -> Result<()> {
let mut state = self.modify()?;
state.abort = Some(err);
state.close();
Ok(())
}
pub async fn closed(&self) {
self.state.closed().await
}
pub async fn unused(&self) -> Result<()> {
self.state
.unused()
.await
.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
}
pub async fn used(&self) -> Result<()> {
self.state
.used()
.await
.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
}
fn modify(&self) -> Result<conducer::Mut<'_, State>> {
self.state
.write()
.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
}
}
impl Default for BandwidthProducer {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct BandwidthConsumer {
state: conducer::Consumer<State>,
last: Option<u64>,
}
impl BandwidthConsumer {
pub fn peek(&self) -> Option<u64> {
self.state.read().bitrate
}
pub fn poll_changed(&mut self, waiter: &conducer::Waiter) -> Poll<Option<u64>> {
let last = self.last;
match self.state.poll(waiter, |state| {
if state.bitrate != last {
Poll::Ready(state.bitrate)
} else {
Poll::Pending
}
}) {
Poll::Ready(Ok(bitrate)) => {
self.last = bitrate;
Poll::Ready(bitrate)
}
Poll::Ready(Err(_)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
pub async fn changed(&mut self) -> Option<u64> {
conducer::wait(|waiter| self.poll_changed(waiter)).await
}
}