Skip to main content

moq_lite/model/
bandwidth.rs

1//! Bandwidth estimation, split into a [BandwidthProducer] and [BandwidthConsumer] handle.
2//!
3//! A [BandwidthProducer] is used to set the current estimated bitrate, notifying consumers.
4//! A [BandwidthConsumer] can read the current estimate and wait for changes.
5
6use std::task::Poll;
7
8use crate::{Error, Result};
9
10#[derive(Default)]
11struct State {
12	bitrate: Option<u64>,
13	abort: Option<Error>,
14}
15
16/// Produces bandwidth estimates, notifying consumers when the value changes.
17#[derive(Clone)]
18pub struct BandwidthProducer {
19	state: conducer::Producer<State>,
20}
21
22impl BandwidthProducer {
23	pub fn new() -> Self {
24		Self {
25			state: conducer::Producer::default(),
26		}
27	}
28
29	/// Set the current bandwidth estimate in bits per second.
30	pub fn set(&self, bitrate: Option<u64>) -> Result<()> {
31		let mut state = self.modify()?;
32		if state.bitrate != bitrate {
33			state.bitrate = bitrate;
34		}
35		Ok(())
36	}
37
38	/// Create a new consumer for the bandwidth estimate.
39	pub fn consume(&self) -> BandwidthConsumer {
40		BandwidthConsumer {
41			state: self.state.consume(),
42			last: None,
43		}
44	}
45
46	/// Close the producer with an error, notifying all consumers.
47	pub fn close(&self, err: Error) -> Result<()> {
48		let mut state = self.modify()?;
49		state.abort = Some(err);
50		state.close();
51		Ok(())
52	}
53
54	/// Block until the channel is closed.
55	pub async fn closed(&self) {
56		self.state.closed().await
57	}
58
59	/// Block until there are no active consumers.
60	pub async fn unused(&self) -> Result<()> {
61		self.state
62			.unused()
63			.await
64			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
65	}
66
67	/// Block until there is at least one active consumer.
68	pub async fn used(&self) -> Result<()> {
69		self.state
70			.used()
71			.await
72			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
73	}
74
75	fn modify(&self) -> Result<conducer::Mut<'_, State>> {
76		self.state
77			.write()
78			.map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
79	}
80}
81
82impl Default for BandwidthProducer {
83	fn default() -> Self {
84		Self::new()
85	}
86}
87
88/// Consumes bandwidth estimates, allowing reads and async change notifications.
89#[derive(Clone)]
90pub struct BandwidthConsumer {
91	state: conducer::Consumer<State>,
92	last: Option<u64>,
93}
94
95impl BandwidthConsumer {
96	/// Get the current bandwidth estimate synchronously.
97	pub fn peek(&self) -> Option<u64> {
98		self.state.read().bitrate
99	}
100
101	/// Poll for a bandwidth change without blocking.
102	pub fn poll_changed(&mut self, waiter: &conducer::Waiter) -> Poll<Option<u64>> {
103		let last = self.last;
104
105		match self.state.poll(waiter, |state| {
106			if state.bitrate != last {
107				Poll::Ready(state.bitrate)
108			} else {
109				Poll::Pending
110			}
111		}) {
112			Poll::Ready(Ok(bitrate)) => {
113				self.last = bitrate;
114				Poll::Ready(bitrate)
115			}
116			// Channel closed
117			Poll::Ready(Err(_)) => Poll::Ready(None),
118			Poll::Pending => Poll::Pending,
119		}
120	}
121
122	/// Block until the bandwidth estimate changes. Returns the new value.
123	/// Returns `None` if the producer is dropped or the estimate is unavailable.
124	pub async fn changed(&mut self) -> Option<u64> {
125		conducer::wait(|waiter| self.poll_changed(waiter)).await
126	}
127}