moq_lite/model/
bandwidth.rs1use std::task::Poll;
7
8use crate::{Error, Result};
9
10#[derive(Default)]
11struct State {
12 bitrate: Option<u64>,
13 abort: Option<Error>,
14}
15
16#[derive(Clone)]
18pub struct BandwidthProducer {
19 state: conducer::Producer<State>,
20}
21
22impl BandwidthProducer {
23 pub fn new() -> Self {
25 Self {
26 state: conducer::Producer::default(),
27 }
28 }
29
30 pub fn set(&self, bitrate: Option<u64>) -> Result<()> {
32 let mut state = self.modify()?;
33 if state.bitrate != bitrate {
34 state.bitrate = bitrate;
35 }
36 Ok(())
37 }
38
39 pub fn consume(&self) -> BandwidthConsumer {
41 BandwidthConsumer {
42 state: self.state.consume(),
43 last: None,
44 }
45 }
46
47 pub fn close(&self, err: Error) -> Result<()> {
49 let mut state = self.modify()?;
50 state.abort = Some(err);
51 state.close();
52 Ok(())
53 }
54
55 pub async fn closed(&self) {
57 self.state.closed().await
58 }
59
60 pub async fn unused(&self) -> Result<()> {
62 self.state
63 .unused()
64 .await
65 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
66 }
67
68 pub async fn used(&self) -> Result<()> {
70 self.state
71 .used()
72 .await
73 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
74 }
75
76 fn modify(&self) -> Result<conducer::Mut<'_, State>> {
77 self.state
78 .write()
79 .map_err(|r| r.abort.clone().unwrap_or(Error::Dropped))
80 }
81}
82
83impl Default for BandwidthProducer {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89#[derive(Clone)]
91pub struct BandwidthConsumer {
92 state: conducer::Consumer<State>,
93 last: Option<u64>,
94}
95
96impl BandwidthConsumer {
97 pub fn peek(&self) -> Option<u64> {
99 self.state.read().bitrate
100 }
101
102 pub fn poll_changed(&mut self, waiter: &conducer::Waiter) -> Poll<Option<u64>> {
104 let last = self.last;
105
106 match self.state.poll(waiter, |state| {
107 if state.bitrate != last {
108 Poll::Ready(state.bitrate)
109 } else {
110 Poll::Pending
111 }
112 }) {
113 Poll::Ready(Ok(bitrate)) => {
114 self.last = bitrate;
115 Poll::Ready(bitrate)
116 }
117 Poll::Ready(Err(_)) => Poll::Ready(None),
119 Poll::Pending => Poll::Pending,
120 }
121 }
122
123 pub async fn changed(&mut self) -> Option<u64> {
126 conducer::wait(|waiter| self.poll_changed(waiter)).await
127 }
128}