Skip to main content

moq_lite/model/
bandwidth.rs

1//! Bandwidth estimation, split into a [BandwidthProducer] and [BandwidthConsumer] handle.
2//!
3//! A [BandwidthProducer] is used to set the current estimated bitrate, notifying consumers.
4//! A [BandwidthConsumer] can read the current estimate and wait for changes.
5
6use std::task::Poll;
7
8use crate::{Error, Result};
9
10#[derive(Default)]
11struct State {
12	bitrate: Option<u64>,
13	abort: Option<Error>,
14}
15
16/// Produces bandwidth estimates, notifying consumers when the value changes.
17#[derive(Clone)]
18pub struct BandwidthProducer {
19	state: conducer::Producer<State>,
20}
21
22impl BandwidthProducer {
23	/// Create a fresh producer with no current estimate.
24	pub fn new() -> Self {
25		Self {
26			state: conducer::Producer::default(),
27		}
28	}
29
30	/// Set the current bandwidth estimate in bits per second.
31	pub fn set(&self, bitrate: Option<u64>) -> Result<()> {
32		let mut state = self.modify()?;
33		if state.bitrate != bitrate {
34			state.bitrate = bitrate;
35		}
36		Ok(())
37	}
38
39	/// Create a new consumer for the bandwidth estimate.
40	pub fn consume(&self) -> BandwidthConsumer {
41		BandwidthConsumer {
42			state: self.state.consume(),
43			last: None,
44		}
45	}
46
47	/// Close the producer with an error, notifying all consumers.
48	pub fn close(&self, err: Error) -> Result<()> {
49		let mut state = self.modify()?;
50		state.abort = Some(err);
51		state.close();
52		Ok(())
53	}
54
55	/// Block until the channel is closed.
56	pub async fn closed(&self) {
57		self.state.closed().await
58	}
59
60	/// Block until there are no active consumers.
61	pub async fn unused(&self) -> Result<()> {
62		self.state
63			.unused()
64			.await
65			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
66	}
67
68	/// Block until there is at least one active consumer.
69	pub async fn used(&self) -> Result<()> {
70		self.state
71			.used()
72			.await
73			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
74	}
75
76	fn modify(&self) -> Result<conducer::Mut<'_, State>> {
77		self.state
78			.write()
79			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
80	}
81}
82
83impl Default for BandwidthProducer {
84	fn default() -> Self {
85		Self::new()
86	}
87}
88
89/// Consumes bandwidth estimates, allowing reads and async change notifications.
90#[derive(Clone)]
91pub struct BandwidthConsumer {
92	state: conducer::Consumer<State>,
93	last: Option<u64>,
94}
95
96impl BandwidthConsumer {
97	/// Get the current bandwidth estimate synchronously.
98	pub fn peek(&self) -> Option<u64> {
99		self.state.read().bitrate
100	}
101
102	/// Poll for a bandwidth change without blocking.
103	pub fn poll_changed(&mut self, waiter: &conducer::Waiter) -> Poll<Option<u64>> {
104		let last = self.last;
105
106		match self.state.poll(waiter, |state| {
107			if state.bitrate != last {
108				Poll::Ready(state.bitrate)
109			} else {
110				Poll::Pending
111			}
112		}) {
113			Poll::Ready(Ok(bitrate)) => {
114				self.last = bitrate;
115				Poll::Ready(bitrate)
116			}
117			// Channel closed
118			Poll::Ready(Err(_)) => Poll::Ready(None),
119			Poll::Pending => Poll::Pending,
120		}
121	}
122
123	/// Block until the bandwidth estimate changes. Returns the new value.
124	/// Returns `None` if the producer is dropped or the estimate is unavailable.
125	pub async fn changed(&mut self) -> Option<u64> {
126		conducer::wait(|waiter| self.poll_changed(waiter)).await
127	}
128}