moq_lite/model/
bandwidth.rs1use std::task::Poll;
7
8use crate::{Error, Result};
9
10#[derive(Default)]
11struct State {
12 bitrate: Option<u64>,
13 abort: Option<Error>,
14}
15
16#[derive(Clone)]
18pub struct BandwidthProducer {
19 state: conducer::Producer<State>,
20}
21
22impl BandwidthProducer {
23 pub fn new() -> Self {
24 Self {
25 state: conducer::Producer::default(),
26 }
27 }
28
29 pub fn set(&self, bitrate: Option<u64>) -> Result<()> {
31 let mut state = self.modify()?;
32 if state.bitrate != bitrate {
33 state.bitrate = bitrate;
34 }
35 Ok(())
36 }
37
38 pub fn consume(&self) -> BandwidthConsumer {
40 BandwidthConsumer {
41 state: self.state.consume(),
42 last: None,
43 }
44 }
45
46 pub fn close(&self, err: Error) -> Result<()> {
48 let mut state = self.modify()?;
49 state.abort = Some(err);
50 state.close();
51 Ok(())
52 }
53
54 pub async fn closed(&self) {
56 self.state.closed().await
57 }
58
59 pub async fn unused(&self) -> Result<()> {
61 self.state
62 .unused()
63 .await
64 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
65 }
66
67 pub async fn used(&self) -> Result<()> {
69 self.state
70 .used()
71 .await
72 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
73 }
74
75 fn modify(&self) -> Result<conducer::Mut<'_, State>> {
76 self.state
77 .write()
78 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
79 }
80}
81
82impl Default for BandwidthProducer {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88#[derive(Clone)]
90pub struct BandwidthConsumer {
91 state: conducer::Consumer<State>,
92 last: Option<u64>,
93}
94
95impl BandwidthConsumer {
96 pub fn peek(&self) -> Option<u64> {
98 self.state.read().bitrate
99 }
100
101 pub fn poll_changed(&mut self, waiter: &conducer::Waiter) -> Poll<Option<u64>> {
103 let last = self.last;
104
105 match self.state.poll(waiter, |state| {
106 if state.bitrate != last {
107 Poll::Ready(state.bitrate)
108 } else {
109 Poll::Pending
110 }
111 }) {
112 Poll::Ready(Ok(bitrate)) => {
113 self.last = bitrate;
114 Poll::Ready(bitrate)
115 }
116 Poll::Ready(Err(_)) => Poll::Ready(None),
118 Poll::Pending => Poll::Pending,
119 }
120 }
121
122 pub async fn changed(&mut self) -> Option<u64> {
125 conducer::wait(|waiter| self.poll_changed(waiter)).await
126 }
127}