moq_transfork/model/
frame.rs1use bytes::{Bytes, BytesMut};
2use std::{fmt, ops};
3use tokio::sync::watch;
4
5use crate::Error;
6
7#[derive(Clone, PartialEq, Debug)]
9pub struct Frame {
10 pub size: usize,
11}
12
13impl Frame {
14 pub fn new(size: usize) -> Frame {
15 Self { size }
16 }
17
18 pub fn produce(self) -> (FrameProducer, FrameConsumer) {
19 let (send, recv) = watch::channel(FrameState::default());
20
21 let writer = FrameProducer::new(send, self.clone());
22 let reader = FrameConsumer::new(recv, self);
23
24 (writer, reader)
25 }
26}
27
28struct FrameState {
29 chunks: Vec<Bytes>,
31
32 closed: Result<(), Error>,
34}
35
36impl Default for FrameState {
37 fn default() -> Self {
38 Self {
39 chunks: Vec::new(),
40 closed: Ok(()),
41 }
42 }
43}
44
45impl fmt::Debug for FrameState {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 f.debug_struct("FrameState")
48 .field("chunks", &self.chunks.len())
49 .field("closed", &self.closed)
50 .finish()
51 }
52}
53
54#[derive(Clone, Debug)]
56pub struct FrameProducer {
57 state: watch::Sender<FrameState>,
59
60 pub info: Frame,
62}
63
64impl FrameProducer {
65 fn new(state: watch::Sender<FrameState>, info: Frame) -> Self {
66 Self { state, info }
67 }
68
69 pub fn write<B: Into<Bytes>>(&mut self, chunk: B) {
70 self.state.send_modify(|state| state.chunks.push(chunk.into()));
71 }
72
73 pub fn close(self, err: Error) {
75 self.state.send_modify(|state| state.closed = Err(err));
76 }
77
78 pub fn subscribe(&self) -> FrameConsumer {
80 FrameConsumer::new(self.state.subscribe(), self.info.clone())
81 }
82}
83
84impl ops::Deref for FrameProducer {
85 type Target = Frame;
86
87 fn deref(&self) -> &Self::Target {
88 &self.info
89 }
90}
91
92#[derive(Clone, Debug)]
94pub struct FrameConsumer {
95 state: watch::Receiver<FrameState>,
97
98 pub info: Frame,
100
101 index: usize,
104}
105
106impl FrameConsumer {
107 fn new(state: watch::Receiver<FrameState>, group: Frame) -> Self {
108 Self {
109 state,
110 info: group,
111 index: 0,
112 }
113 }
114
115 pub async fn read(&mut self) -> Result<Option<Bytes>, Error> {
117 loop {
118 {
119 let state = self.state.borrow_and_update();
120
121 if let Some(chunk) = state.chunks.get(self.index).cloned() {
122 self.index += 1;
123 return Ok(Some(chunk));
124 }
125
126 state.closed.clone()?;
127 }
128
129 if self.state.changed().await.is_err() {
130 return Ok(None);
131 }
132 }
133 }
134
135 pub async fn read_all(&mut self) -> Result<Bytes, Error> {
137 if let Ok(err) = self.state.wait_for(|s| s.closed.is_err()).await {
140 return Err(err.closed.clone().unwrap_err());
141 };
142
143 let state = self.state.borrow_and_update();
145 let chunks = &state.chunks[self.index..];
146 self.index = state.chunks.len();
147
148 let size = chunks.iter().map(Bytes::len).sum();
150 let mut buf = BytesMut::with_capacity(size);
151
152 for chunk in chunks {
154 buf.extend_from_slice(chunk);
155 }
156
157 Ok(buf.freeze())
158 }
159
160 pub async fn closed(&self) -> Result<(), Error> {
161 match self.state.clone().wait_for(|state| state.closed.is_err()).await {
162 Ok(state) => state.closed.clone(),
163 Err(_) => Ok(()),
164 }
165 }
166}
167
168impl ops::Deref for FrameConsumer {
169 type Target = Frame;
170
171 fn deref(&self) -> &Self::Target {
172 &self.info
173 }
174}