gst_plugin/
adapter.rs

1// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use std::cmp;
10use std::collections::VecDeque;
11use std::error::Error;
12use std::fmt;
13use std::io;
14
15use gst;
16
17lazy_static! {
18    static ref CAT: gst::DebugCategory = {
19        gst::DebugCategory::new(
20            "rsadapter",
21            gst::DebugColorFlags::empty(),
22            "Rust buffer adapter",
23        )
24    };
25}
26
27#[derive(Default, Debug)]
28pub struct Adapter {
29    deque: VecDeque<gst::MappedBuffer<gst::buffer::Readable>>,
30    size: usize,
31    skip: usize,
32    scratch: Vec<u8>,
33}
34
35#[derive(Debug, PartialEq, Eq)]
36pub enum AdapterError {
37    NotEnoughData,
38}
39
40impl fmt::Display for AdapterError {
41    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
42        write!(f, "Not enough data")
43    }
44}
45
46impl Error for AdapterError {
47    fn description(&self) -> &str {
48        "Not enough data"
49    }
50}
51
52impl Adapter {
53    pub fn new() -> Adapter {
54        Adapter {
55            deque: VecDeque::new(),
56            size: 0,
57            skip: 0,
58            scratch: Vec::new(),
59        }
60    }
61
62    pub fn push(&mut self, buffer: gst::Buffer) {
63        let size = buffer.get_size();
64
65        self.size += size;
66        gst_trace!(
67            CAT,
68            "Storing {:?} of size {}, now have size {}",
69            buffer,
70            size,
71            self.size
72        );
73        self.deque
74            .push_back(buffer.into_mapped_buffer_readable().unwrap());
75    }
76
77    pub fn clear(&mut self) {
78        self.deque.clear();
79        self.size = 0;
80        self.skip = 0;
81        self.scratch.clear();
82        gst_trace!(CAT, "Cleared adapter");
83    }
84
85    pub fn get_available(&self) -> usize {
86        self.size
87    }
88
89    fn copy_data(
90        deque: &VecDeque<gst::MappedBuffer<gst::buffer::Readable>>,
91        skip: usize,
92        data: &mut [u8],
93        size: usize,
94    ) {
95        let mut skip = skip;
96        let mut left = size;
97        let mut idx = 0;
98
99        gst_trace!(CAT, "Copying {} bytes", size);
100
101        for item in deque {
102            let data_item = item.as_slice();
103
104            let to_copy = cmp::min(left, data_item.len() - skip);
105            gst_trace!(
106                CAT,
107                "Copying {} bytes from {:?}, {} more to go",
108                to_copy,
109                item.get_buffer(),
110                left - to_copy
111            );
112
113            data[idx..idx + to_copy].copy_from_slice(&data_item[skip..skip + to_copy]);
114            skip = 0;
115            idx += to_copy;
116            left -= to_copy;
117            if left == 0 {
118                break;
119            }
120        }
121        assert_eq!(left, 0);
122    }
123
124    pub fn peek_into(&self, data: &mut [u8]) -> Result<(), AdapterError> {
125        let size = data.len();
126
127        if self.size < size {
128            gst_debug!(
129                CAT,
130                "Peeking {} bytes into, not enough data: have {}",
131                size,
132                self.size
133            );
134            return Err(AdapterError::NotEnoughData);
135        }
136
137        gst_trace!(CAT, "Peeking {} bytes into", size);
138        if size == 0 {
139            return Ok(());
140        }
141
142        Self::copy_data(&self.deque, self.skip, data, size);
143        Ok(())
144    }
145
146    pub fn peek(&mut self, size: usize) -> Result<&[u8], AdapterError> {
147        if self.size < size {
148            gst_debug!(
149                CAT,
150                "Peeking {} bytes, not enough data: have {}",
151                size,
152                self.size
153            );
154            return Err(AdapterError::NotEnoughData);
155        }
156
157        if size == 0 {
158            return Ok(&[]);
159        }
160
161        if let Some(front) = self.deque.front() {
162            gst_trace!(CAT, "Peeking {} bytes, subbuffer of first", size);
163            if front.get_size() - self.skip >= size {
164                return Ok(&front.as_slice()[self.skip..self.skip + size]);
165            }
166        }
167
168        gst_trace!(CAT, "Peeking {} bytes, copy to scratch", size);
169
170        self.scratch.truncate(0);
171        self.scratch.reserve(size);
172        {
173            let data = self.scratch.as_mut_slice();
174            Self::copy_data(&self.deque, self.skip, data, size);
175        }
176
177        Ok(self.scratch.as_slice())
178    }
179
180    pub fn get_buffer(&mut self, size: usize) -> Result<gst::Buffer, AdapterError> {
181        if self.size < size {
182            gst_debug!(
183                CAT,
184                "Get buffer of {} bytes, not enough data: have {}",
185                size,
186                self.size
187            );
188            return Err(AdapterError::NotEnoughData);
189        }
190
191        if size == 0 {
192            return Ok(gst::Buffer::new());
193        }
194
195        let sub = self.deque.front().and_then(|front| {
196            if front.get_size() - self.skip >= size {
197                gst_trace!(CAT, "Get buffer of {} bytes, subbuffer of first", size);
198                let new = front
199                    .get_buffer()
200                    .copy_region(*gst::BUFFER_COPY_ALL, self.skip, Some(size))
201                    .unwrap();
202                Some(new)
203            } else {
204                None
205            }
206        });
207
208        if let Some(s) = sub {
209            self.flush(size).unwrap();
210            return Ok(s);
211        }
212
213        gst_trace!(CAT, "Get buffer of {} bytes, copy into new buffer", size);
214        let mut new = gst::Buffer::with_size(size).unwrap();
215        {
216            let mut map = new.get_mut().unwrap().map_writable().unwrap();
217            let data = map.as_mut_slice();
218            Self::copy_data(&self.deque, self.skip, data, size);
219        }
220        self.flush(size).unwrap();
221        Ok(new)
222    }
223
224    pub fn flush(&mut self, size: usize) -> Result<(), AdapterError> {
225        if self.size < size {
226            gst_debug!(
227                CAT,
228                "Flush {} bytes, not enough data: have {}",
229                size,
230                self.size
231            );
232            return Err(AdapterError::NotEnoughData);
233        }
234
235        if size == 0 {
236            return Ok(());
237        }
238
239        gst_trace!(CAT, "Flushing {} bytes, have {}", size, self.size);
240
241        let mut left = size;
242        while left > 0 {
243            let front_size = self.deque.front().unwrap().get_size() - self.skip;
244
245            if front_size <= left {
246                gst_trace!(
247                    CAT,
248                    "Flushing whole {:?}, {} more to go",
249                    self.deque.front().map(|b| b.get_buffer()),
250                    left - front_size
251                );
252                self.deque.pop_front();
253                self.size -= front_size;
254                self.skip = 0;
255                left -= front_size;
256            } else {
257                gst_trace!(
258                    CAT,
259                    "Flushing partial {:?}, {} more left",
260                    self.deque.front().map(|b| b.get_buffer()),
261                    front_size - left
262                );
263                self.skip += left;
264                self.size -= left;
265                left = 0;
266            }
267        }
268
269        Ok(())
270    }
271}
272
273impl io::Read for Adapter {
274    fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
275        let mut len = self.size;
276
277        if len == 0 {
278            return Err(io::Error::new(
279                io::ErrorKind::WouldBlock,
280                format!(
281                    "Missing data: requesting {} but only got {}.",
282                    buf.len(),
283                    len
284                ),
285            ));
286        }
287
288        if buf.len() < len {
289            len = buf.len();
290        }
291
292        Self::copy_data(&self.deque, self.skip, buf, len);
293        self.flush(len).unwrap();
294        Ok(len)
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use gst;
302
303    #[test]
304    fn test_push_get() {
305        gst::init().unwrap();
306
307        let mut a = Adapter::new();
308
309        a.push(gst::Buffer::with_size(100).unwrap());
310        assert_eq!(a.get_available(), 100);
311        a.push(gst::Buffer::with_size(20).unwrap());
312        assert_eq!(a.get_available(), 120);
313
314        let b = a.get_buffer(20).unwrap();
315        assert_eq!(a.get_available(), 100);
316        assert_eq!(b.get_size(), 20);
317        let b = a.get_buffer(90).unwrap();
318        assert_eq!(a.get_available(), 10);
319        assert_eq!(b.get_size(), 90);
320
321        a.push(gst::Buffer::with_size(20).unwrap());
322        assert_eq!(a.get_available(), 30);
323
324        let b = a.get_buffer(20).unwrap();
325        assert_eq!(a.get_available(), 10);
326        assert_eq!(b.get_size(), 20);
327        let b = a.get_buffer(10).unwrap();
328        assert_eq!(a.get_available(), 0);
329        assert_eq!(b.get_size(), 10);
330
331        let b = a.get_buffer(1);
332        assert_eq!(b.err().unwrap(), AdapterError::NotEnoughData);
333    }
334}