1use std::cmp;
10use std::collections::VecDeque;
11use std::error::Error;
12use std::fmt;
13use std::io;
14
15use gst;
16
17lazy_static! {
18 static ref CAT: gst::DebugCategory = {
19 gst::DebugCategory::new(
20 "rsadapter",
21 gst::DebugColorFlags::empty(),
22 "Rust buffer adapter",
23 )
24 };
25}
26
27#[derive(Default, Debug)]
28pub struct Adapter {
29 deque: VecDeque<gst::MappedBuffer<gst::buffer::Readable>>,
30 size: usize,
31 skip: usize,
32 scratch: Vec<u8>,
33}
34
35#[derive(Debug, PartialEq, Eq)]
36pub enum AdapterError {
37 NotEnoughData,
38}
39
40impl fmt::Display for AdapterError {
41 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
42 write!(f, "Not enough data")
43 }
44}
45
46impl Error for AdapterError {
47 fn description(&self) -> &str {
48 "Not enough data"
49 }
50}
51
52impl Adapter {
53 pub fn new() -> Adapter {
54 Adapter {
55 deque: VecDeque::new(),
56 size: 0,
57 skip: 0,
58 scratch: Vec::new(),
59 }
60 }
61
62 pub fn push(&mut self, buffer: gst::Buffer) {
63 let size = buffer.get_size();
64
65 self.size += size;
66 gst_trace!(
67 CAT,
68 "Storing {:?} of size {}, now have size {}",
69 buffer,
70 size,
71 self.size
72 );
73 self.deque
74 .push_back(buffer.into_mapped_buffer_readable().unwrap());
75 }
76
77 pub fn clear(&mut self) {
78 self.deque.clear();
79 self.size = 0;
80 self.skip = 0;
81 self.scratch.clear();
82 gst_trace!(CAT, "Cleared adapter");
83 }
84
85 pub fn get_available(&self) -> usize {
86 self.size
87 }
88
89 fn copy_data(
90 deque: &VecDeque<gst::MappedBuffer<gst::buffer::Readable>>,
91 skip: usize,
92 data: &mut [u8],
93 size: usize,
94 ) {
95 let mut skip = skip;
96 let mut left = size;
97 let mut idx = 0;
98
99 gst_trace!(CAT, "Copying {} bytes", size);
100
101 for item in deque {
102 let data_item = item.as_slice();
103
104 let to_copy = cmp::min(left, data_item.len() - skip);
105 gst_trace!(
106 CAT,
107 "Copying {} bytes from {:?}, {} more to go",
108 to_copy,
109 item.get_buffer(),
110 left - to_copy
111 );
112
113 data[idx..idx + to_copy].copy_from_slice(&data_item[skip..skip + to_copy]);
114 skip = 0;
115 idx += to_copy;
116 left -= to_copy;
117 if left == 0 {
118 break;
119 }
120 }
121 assert_eq!(left, 0);
122 }
123
124 pub fn peek_into(&self, data: &mut [u8]) -> Result<(), AdapterError> {
125 let size = data.len();
126
127 if self.size < size {
128 gst_debug!(
129 CAT,
130 "Peeking {} bytes into, not enough data: have {}",
131 size,
132 self.size
133 );
134 return Err(AdapterError::NotEnoughData);
135 }
136
137 gst_trace!(CAT, "Peeking {} bytes into", size);
138 if size == 0 {
139 return Ok(());
140 }
141
142 Self::copy_data(&self.deque, self.skip, data, size);
143 Ok(())
144 }
145
146 pub fn peek(&mut self, size: usize) -> Result<&[u8], AdapterError> {
147 if self.size < size {
148 gst_debug!(
149 CAT,
150 "Peeking {} bytes, not enough data: have {}",
151 size,
152 self.size
153 );
154 return Err(AdapterError::NotEnoughData);
155 }
156
157 if size == 0 {
158 return Ok(&[]);
159 }
160
161 if let Some(front) = self.deque.front() {
162 gst_trace!(CAT, "Peeking {} bytes, subbuffer of first", size);
163 if front.get_size() - self.skip >= size {
164 return Ok(&front.as_slice()[self.skip..self.skip + size]);
165 }
166 }
167
168 gst_trace!(CAT, "Peeking {} bytes, copy to scratch", size);
169
170 self.scratch.truncate(0);
171 self.scratch.reserve(size);
172 {
173 let data = self.scratch.as_mut_slice();
174 Self::copy_data(&self.deque, self.skip, data, size);
175 }
176
177 Ok(self.scratch.as_slice())
178 }
179
180 pub fn get_buffer(&mut self, size: usize) -> Result<gst::Buffer, AdapterError> {
181 if self.size < size {
182 gst_debug!(
183 CAT,
184 "Get buffer of {} bytes, not enough data: have {}",
185 size,
186 self.size
187 );
188 return Err(AdapterError::NotEnoughData);
189 }
190
191 if size == 0 {
192 return Ok(gst::Buffer::new());
193 }
194
195 let sub = self.deque.front().and_then(|front| {
196 if front.get_size() - self.skip >= size {
197 gst_trace!(CAT, "Get buffer of {} bytes, subbuffer of first", size);
198 let new = front
199 .get_buffer()
200 .copy_region(*gst::BUFFER_COPY_ALL, self.skip, Some(size))
201 .unwrap();
202 Some(new)
203 } else {
204 None
205 }
206 });
207
208 if let Some(s) = sub {
209 self.flush(size).unwrap();
210 return Ok(s);
211 }
212
213 gst_trace!(CAT, "Get buffer of {} bytes, copy into new buffer", size);
214 let mut new = gst::Buffer::with_size(size).unwrap();
215 {
216 let mut map = new.get_mut().unwrap().map_writable().unwrap();
217 let data = map.as_mut_slice();
218 Self::copy_data(&self.deque, self.skip, data, size);
219 }
220 self.flush(size).unwrap();
221 Ok(new)
222 }
223
224 pub fn flush(&mut self, size: usize) -> Result<(), AdapterError> {
225 if self.size < size {
226 gst_debug!(
227 CAT,
228 "Flush {} bytes, not enough data: have {}",
229 size,
230 self.size
231 );
232 return Err(AdapterError::NotEnoughData);
233 }
234
235 if size == 0 {
236 return Ok(());
237 }
238
239 gst_trace!(CAT, "Flushing {} bytes, have {}", size, self.size);
240
241 let mut left = size;
242 while left > 0 {
243 let front_size = self.deque.front().unwrap().get_size() - self.skip;
244
245 if front_size <= left {
246 gst_trace!(
247 CAT,
248 "Flushing whole {:?}, {} more to go",
249 self.deque.front().map(|b| b.get_buffer()),
250 left - front_size
251 );
252 self.deque.pop_front();
253 self.size -= front_size;
254 self.skip = 0;
255 left -= front_size;
256 } else {
257 gst_trace!(
258 CAT,
259 "Flushing partial {:?}, {} more left",
260 self.deque.front().map(|b| b.get_buffer()),
261 front_size - left
262 );
263 self.skip += left;
264 self.size -= left;
265 left = 0;
266 }
267 }
268
269 Ok(())
270 }
271}
272
273impl io::Read for Adapter {
274 fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
275 let mut len = self.size;
276
277 if len == 0 {
278 return Err(io::Error::new(
279 io::ErrorKind::WouldBlock,
280 format!(
281 "Missing data: requesting {} but only got {}.",
282 buf.len(),
283 len
284 ),
285 ));
286 }
287
288 if buf.len() < len {
289 len = buf.len();
290 }
291
292 Self::copy_data(&self.deque, self.skip, buf, len);
293 self.flush(len).unwrap();
294 Ok(len)
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use gst;
302
303 #[test]
304 fn test_push_get() {
305 gst::init().unwrap();
306
307 let mut a = Adapter::new();
308
309 a.push(gst::Buffer::with_size(100).unwrap());
310 assert_eq!(a.get_available(), 100);
311 a.push(gst::Buffer::with_size(20).unwrap());
312 assert_eq!(a.get_available(), 120);
313
314 let b = a.get_buffer(20).unwrap();
315 assert_eq!(a.get_available(), 100);
316 assert_eq!(b.get_size(), 20);
317 let b = a.get_buffer(90).unwrap();
318 assert_eq!(a.get_available(), 10);
319 assert_eq!(b.get_size(), 90);
320
321 a.push(gst::Buffer::with_size(20).unwrap());
322 assert_eq!(a.get_available(), 30);
323
324 let b = a.get_buffer(20).unwrap();
325 assert_eq!(a.get_available(), 10);
326 assert_eq!(b.get_size(), 20);
327 let b = a.get_buffer(10).unwrap();
328 assert_eq!(a.get_available(), 0);
329 assert_eq!(b.get_size(), 10);
330
331 let b = a.get_buffer(1);
332 assert_eq!(b.err().unwrap(), AdapterError::NotEnoughData);
333 }
334}