lepton_mux/
mux.rs

1use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
2use core;
3
4use interface::{ReadableBytes, StreamDemuxer, StreamID, StreamMuxer, WritableBytes,
5                MAX_NUM_STREAM, STREAM_ID_MASK};
6use util::AllocatedMemoryRange;
7
8pub const EOF_MARKER: [u8; 3] = [0xff, 0xfe, 0xff];
9const MAX_HEADER_SIZE: usize = 3;
10const MAX_FLUSH_VARIANCE: usize = 131073;
11
12enum BytesToDeserialize {
13    None,
14    Some(StreamID, u32),
15    Header0(StreamID),
16    Header1(StreamID, u8),
17}
18
19enum StreamState {
20    Running,
21    EofStart,
22    EofMid,
23    EofDone,
24}
25
26#[derive(Debug)]
27enum MuxSliceHeader {
28    Var([u8; MAX_HEADER_SIZE]),
29    Fixed([u8; 1]),
30}
31
32fn chunk_size(last_flushed: usize, lagging_stream: bool) -> usize {
33    if lagging_stream {
34        return 16;
35    }
36    if last_flushed <= 1024 {
37        return 4096;
38    }
39    if last_flushed <= 65536 {
40        return 16384;
41    }
42    return 65536;
43}
44
45fn get_mux_header(
46    stream_id: StreamID,
47    n_bytes_to_write: usize,
48    is_lagging: bool,
49) -> (MuxSliceHeader, usize) {
50    //eprintln!("want to: {},{},", stream_id, bytes_to_write);
51    if is_lagging == false || n_bytes_to_write == 4096 || n_bytes_to_write == 16384
52        || n_bytes_to_write >= 65536
53    {
54        if n_bytes_to_write < 4096 {
55            return get_mux_header(stream_id, n_bytes_to_write, true);
56        }
57        if n_bytes_to_write < 16384 {
58            //eprintln!("({},{})", stream_id, 4096);
59            return (MuxSliceHeader::Fixed([stream_id as u8 | (1 << 4)]), 4096);
60        }
61        if n_bytes_to_write < 65536 {
62            //eprintln!("({},{})", stream_id, 16384);
63            return (MuxSliceHeader::Fixed([stream_id as u8 | (2 << 4)]), 16384);
64        }
65        //eprintln!("({},{})", stream_id, 65536);
66        return (MuxSliceHeader::Fixed([stream_id as u8 | (3 << 4)]), 65536);
67    }
68    assert!(n_bytes_to_write < 65536);
69    //eprintln!("({},{})", stream_id, bytes_to_write);
70    let ret = [
71        stream_id,
72        (n_bytes_to_write - 1) as u8,
73        ((n_bytes_to_write - 1) >> 8) as u8,
74    ];
75    return (MuxSliceHeader::Var(ret), n_bytes_to_write);
76}
77
78pub struct Mux<AllocU8: Allocator<u8>> {
79    buf: Vec<AllocatedMemoryRange<u8, AllocU8>>,
80    cur_stream: StreamID,
81    cur_stream_bytes_avail: usize,
82    bytes_flushed: usize,
83    // The total number of bytes that have been flushed out by the mux
84    // when the stream finished flushing last time
85    stream_state: StreamState,
86    last_flush: Vec<usize>,
87    bytes_to_deserialize: BytesToDeserialize,
88}
89
90impl<AllocU8: Allocator<u8>> Default for Mux<AllocU8> {
91    fn default() -> Self {
92        Self::new(2)
93    }
94}
95
96impl<AllocU8: Allocator<u8>> StreamMuxer<AllocU8> for Mux<AllocU8> {
97    fn write(&mut self, stream_id: StreamID, data: &[u8], alloc_u8: &mut AllocU8) -> usize {
98        self.push_data(stream_id, data, alloc_u8);
99        data.len()
100    }
101
102    fn write_buffer(&mut self, stream_id: StreamID, alloc_u8: &mut AllocU8) -> WritableBytes {
103        const MIN_BYTES: usize = 16;
104        self.prep_push_for_n_bytes(stream_id, MIN_BYTES, alloc_u8);
105        let buf = &mut self.buf[stream_id as usize];
106        WritableBytes {
107            data: buf.mem.slice_mut(),
108            write_offset: &mut buf.range.end,
109        }
110    }
111
112    fn serialize(&mut self, output: &mut [u8]) -> usize {
113        let mut output_offset = 0usize;
114        if self.cur_stream_bytes_avail != 0 {
115            output_offset += self.serialize_leftover(output);
116        }
117        while output_offset < output.len() {
118            let mut flushed_any = false;
119            let mut min_flush = self.last_flush[0];
120            let mut max_flush = self.last_flush[0];
121            for lf in self.last_flush[1..].iter() {
122                if *lf < min_flush {
123                    min_flush = *lf;
124                }
125                if *lf > max_flush {
126                    max_flush = *lf;
127                }
128            }
129            for index in 0..self.n_stream() {
130                let mut is_lagging = self.last_flush[index] + MAX_FLUSH_VARIANCE < max_flush;
131                if self.write_cursor(index) - self.read_cursor(index)
132                    >= chunk_size(self.last_flush[index], is_lagging)
133                    && self.last_flush[index] <= min_flush + MAX_FLUSH_VARIANCE
134                {
135                    flushed_any = true;
136                    self.serialize_stream_id(
137                        index as StreamID,
138                        output,
139                        &mut output_offset,
140                        is_lagging,
141                    );
142                    if self.cur_stream_bytes_avail != 0 {
143                        break;
144                    }
145                }
146            }
147            if !flushed_any {
148                break;
149            }
150        }
151        output_offset
152    }
153
154    fn flush(&mut self, output: &mut [u8]) -> usize {
155        match self.stream_state {
156            StreamState::EofDone => return 0,
157            _ => {}
158        }
159        let mut ret = self.flush_internal(output);
160        if ret == output.len() {
161            return ret;
162        }
163        match self.stream_state {
164            StreamState::Running => {
165                output[ret] = EOF_MARKER[0];
166                ret += 1;
167                self.stream_state = StreamState::EofStart;
168            }
169            _ => {}
170        }
171        if ret == output.len() {
172            return ret;
173        }
174        match self.stream_state {
175            StreamState::EofStart => {
176                output[ret] = EOF_MARKER[1];
177                ret += 1;
178                self.stream_state = StreamState::EofMid;
179            }
180            _ => {}
181        }
182        if ret == output.len() {
183            return ret;
184        }
185        match self.stream_state {
186            StreamState::EofMid => {
187                output[ret] = EOF_MARKER[2];
188                ret += 1;
189                self.stream_state = StreamState::EofDone;
190            }
191            _ => {}
192        }
193        return ret;
194    }
195
196    fn wrote_eof(&self) -> bool {
197        self.is_eof()
198    }
199
200    fn free(&mut self, alloc_u8: &mut AllocU8) {
201        self.free(alloc_u8);
202    }
203
204    fn n_stream(&self) -> usize {
205        self.n_stream()
206    }
207}
208
209impl<AllocU8: Allocator<u8>> StreamDemuxer<AllocU8> for Mux<AllocU8> {
210    fn deserialize(&mut self, data: &[u8], alloc_u8: &mut AllocU8) -> usize {
211        let mut input = data;
212        let mut ret = 0usize;
213        while input.len() != 0 && match self.stream_state {
214            StreamState::EofDone => false,
215            _ => true,
216        } {
217            match self.bytes_to_deserialize {
218                BytesToDeserialize::Header0(stream_id) => {
219                    self.bytes_to_deserialize = BytesToDeserialize::Header1(stream_id, input[0]);
220                    return ret + 1 + self.deserialize(input.split_at(1).1, alloc_u8);
221                }
222                BytesToDeserialize::Header1(stream_id, lsb) => {
223                    self.bytes_to_deserialize = BytesToDeserialize::Some(
224                        stream_id,
225                        (lsb as u32 | (input[0] as u32) << 8) + 1,
226                    );
227                    //eprint!("{}) Deserializing {}\n", stream_id, (lsb as u32 | (input[0] as u32) << 8) + 1);
228                    //eprintln!("({},{}),", stream_id, (lsb as u32 | (input[0] as u32) << 8) + 1);
229                    return ret + 1 + self.deserialize(input.split_at(1).1, alloc_u8);
230                }
231                BytesToDeserialize::Some(stream_id, count) => {
232                    if count as usize > input.len() {
233                        self.push_data(stream_id, input, alloc_u8);
234                        self.bytes_to_deserialize =
235                            BytesToDeserialize::Some(stream_id, count - input.len() as u32);
236                        return ret + input.len();
237                    }
238                    let (to_push, remainder) = input.split_at(count as usize);
239                    self.push_data(stream_id, to_push, alloc_u8);
240                    input = remainder;
241                    self.bytes_to_deserialize = BytesToDeserialize::None;
242                    ret += to_push.len();
243                }
244                BytesToDeserialize::None => {
245                    if input[0] == EOF_MARKER[0] || input[0] == EOF_MARKER[1]
246                        || input[0] == EOF_MARKER[2]
247                    {
248                        if input[0] == EOF_MARKER[0] || match self.stream_state {
249                            StreamState::Running => false,
250                            _ => true,
251                        } {
252                            //eprint!("DESERIALIZING EOF\n");
253                            return ret + self.deserialize_eof(input);
254                        }
255                    }
256                    let stream_id = input[0] & STREAM_ID_MASK;
257                    let count: usize;
258                    let bytes_to_copy: u32;
259                    if input[0] < 16 {
260                        // Fixed header
261                        if input.len() < 3 {
262                            self.bytes_to_deserialize = BytesToDeserialize::Header0(stream_id);
263                            return ret + 1 + self.deserialize(input.split_at(1).1, alloc_u8);
264                        }
265                        count = 3;
266                        bytes_to_copy = (input[1] as u32 | (input[2] as u32) << 8) + 1;
267                    //eprintln!("({},{}),", stream_id, bytes_to_copy);
268                    } else {
269                        // Var header
270                        count = 1;
271                        bytes_to_copy = 1024 << ((input[0] >> 4) << 1);
272                        //eprintln!("({},{}),", stream_id, bytes_to_copy);
273                    }
274                    //eprint!("{}) Deserializing {}\n", stream_id, bytes_to_copy);
275                    self.bytes_to_deserialize = BytesToDeserialize::Some(stream_id, bytes_to_copy);
276                    input = input.split_at(count).1;
277                    ret += count;
278                }
279            }
280        }
281        ret
282    }
283
284    fn read_buffer(&mut self, stream_id: StreamID) -> ReadableBytes {
285        let buf = &mut self.buf[stream_id as usize];
286        ReadableBytes {
287            data: buf.mem.slice().split_at(buf.range.end).0,
288            read_offset: &mut buf.range.start,
289        }
290    }
291
292    fn data_len(&self, stream_id: StreamID) -> usize {
293        self.write_cursor(usize::from(stream_id)) - self.read_cursor(usize::from(stream_id))
294    }
295
296    fn data(&self, stream_id: StreamID) -> &[u8] {
297        &self.buf[usize::from(stream_id)].slice()
298    }
299
300    fn editable_data(&mut self, stream_id: StreamID) -> &mut AllocatedMemoryRange<u8, AllocU8> {
301        &mut self.buf[usize::from(stream_id)]
302    }
303
304    fn consume_data(&mut self, stream_id: StreamID, count: usize) {
305        self.buf[usize::from(stream_id)].range.start += count;
306    }
307
308    fn consumed_all_streams_until_eof(&self) -> bool {
309        self.is_eof()
310    }
311
312    fn encountered_eof(&self) -> bool {
313        match self.stream_state {
314            StreamState::EofDone => true,
315            _ => false,
316        }
317    }
318
319    fn free(&mut self, alloc_u8: &mut AllocU8) {
320        self.free(alloc_u8)
321    }
322
323    fn n_stream(&self) -> usize {
324        self.n_stream()
325    }
326}
327
328impl<AllocU8: Allocator<u8>> Mux<AllocU8> {
329    pub fn new(n_stream: usize) -> Self {
330        assert!(n_stream <= MAX_NUM_STREAM);
331        let mut buf = Vec::with_capacity(n_stream);
332        for _ in 0..n_stream {
333            buf.push(AllocatedMemoryRange::default());
334        }
335        Mux::<AllocU8> {
336            buf,
337            cur_stream: 0,
338            cur_stream_bytes_avail: 0,
339            bytes_flushed: 0,
340            last_flush: vec![0; n_stream],
341            stream_state: StreamState::Running,
342            bytes_to_deserialize: BytesToDeserialize::None,
343        }
344    }
345
346    pub fn n_stream(&self) -> usize {
347        self.buf.len()
348    }
349
350    pub fn read_cursor(&self, index: usize) -> usize {
351        self.buf[index].range.start
352    }
353
354    pub fn write_cursor(&self, index: usize) -> usize {
355        self.buf[index].range.end
356    }
357
358    pub fn is_eof(&self) -> bool {
359        for index in 0..self.n_stream() {
360            if self.read_cursor(index) != self.write_cursor(index) {
361                return false;
362            }
363        }
364        match self.stream_state {
365            StreamState::EofDone => true,
366            _ => false,
367        }
368    }
369
370    pub fn prealloc(&mut self, alloc_u8: &mut AllocU8, amount_per_stream: usize) {
371        for buf in self.buf.iter_mut() {
372            assert_eq!(buf.mem.slice().len(), 0);
373            let mfd = core::mem::replace(&mut buf.mem, alloc_u8.alloc_cell(amount_per_stream));
374            alloc_u8.free_cell(mfd);
375        }
376    }
377
378    pub fn free(&mut self, alloc_u8: &mut AllocU8) {
379        for buf in self.buf.iter_mut() {
380            alloc_u8.free_cell(core::mem::replace(
381                &mut buf.mem,
382                AllocU8::AllocatedMemory::default(),
383            ));
384        }
385    }
386
387    /// Pushes data from a source into the stream buffer specified by `stream_id`.
388    /// This data may later be serialized through `serialize` or else consumed
389    /// through `data` or `consume`.
390    pub fn push_data(&mut self, stream_id: StreamID, data: &[u8], alloc_u8: &mut AllocU8) {
391        let (buf, offset) = self.prep_push_for_n_bytes(stream_id, data.len(), alloc_u8);
392        Self::unchecked_push(buf.slice_mut(), offset, data)
393    }
394
395    fn unchecked_push(buf: &mut [u8], write_cursor: &mut usize, data: &[u8]) {
396        buf.split_at_mut(*write_cursor)
397            .1
398            .split_at_mut(data.len())
399            .0
400            .clone_from_slice(data);
401        *write_cursor += data.len();
402    }
403
404    fn prep_push_for_n_bytes(
405        &mut self,
406        stream_id: StreamID,
407        data_len: usize,
408        alloc_u8: &mut AllocU8,
409    ) -> (&mut AllocU8::AllocatedMemory, &mut usize) {
410        //let mut write_cursor = &mut self.write_cursor[stream_id as usize];
411        let buf_entry = &mut self.buf[usize::from(stream_id)];
412        let write_cursor = &mut buf_entry.range.end;
413        let read_cursor = &mut buf_entry.range.start;
414        let buf = &mut buf_entry.mem;
415        // if there's space in the buffer, simply return it
416        if buf.slice().len() - *write_cursor >= data_len {
417            return (buf, write_cursor);
418        }
419        // if there's too much room at the beginning and the new data fits, then move everything to the beginning
420        if buf.slice().len() >= (*write_cursor - *read_cursor) + data_len + MAX_HEADER_SIZE
421            && (*read_cursor == *write_cursor
422                || (*read_cursor >= 16384
423                    && *read_cursor > *write_cursor - *read_cursor + MAX_HEADER_SIZE))
424        {
425            {
426                let (unbuffered_empty_half, full_half) = buf.slice_mut().split_at_mut(*read_cursor);
427                let empty_half = unbuffered_empty_half.split_at_mut(MAX_HEADER_SIZE).1; // leave some room on the beginning side for header data to be flushed
428                let amount_of_data_to_copy = *write_cursor - *read_cursor;
429                empty_half
430                    .split_at_mut(amount_of_data_to_copy)
431                    .0
432                    .clone_from_slice(full_half.split_at(amount_of_data_to_copy).0);
433                *write_cursor = MAX_HEADER_SIZE + amount_of_data_to_copy;
434                *read_cursor = MAX_HEADER_SIZE;
435            }
436            return (buf, write_cursor);
437        }
438        // find the next power of two buffer size that could hold everything including the recently added data
439        let desired_size: u64 =
440            (MAX_HEADER_SIZE + data_len + (*write_cursor - *read_cursor)) as u64;
441        let log_desired_size = (64 - desired_size.leading_zeros()) + 1;
442        // allocate space for new data and copy in the current data
443        let mut new_buf = alloc_u8.alloc_cell(1 << core::cmp::max(log_desired_size, 9));
444        debug_assert!(new_buf.slice().len() >= *write_cursor - *read_cursor + data_len);
445        new_buf
446            .slice_mut()
447            .split_at_mut(MAX_HEADER_SIZE)
448            .1
449            .split_at_mut(*write_cursor - *read_cursor)
450            .0
451            .clone_from_slice(
452                buf.slice()
453                    .split_at(*read_cursor)
454                    .1
455                    .split_at(*write_cursor - *read_cursor)
456                    .0,
457            );
458        *write_cursor = MAX_HEADER_SIZE + *write_cursor - *read_cursor;
459        *read_cursor = MAX_HEADER_SIZE;
460        alloc_u8.free_cell(core::mem::replace(buf, new_buf));
461        (buf, write_cursor)
462    }
463
464    /// copy the remaining data from a previous serialize
465    fn serialize_leftover(&mut self, output: &mut [u8]) -> usize {
466        let to_copy = core::cmp::min(self.cur_stream_bytes_avail, output.len());
467        output.split_at_mut(to_copy).0.clone_from_slice(
468            self.buf[usize::from(self.cur_stream)]
469                .mem
470                .slice()
471                .split_at(self.read_cursor(usize::from(self.cur_stream)))
472                .1
473                .split_at(to_copy)
474                .0,
475        );
476        self.buf[usize::from(self.cur_stream)].range.start += to_copy;
477        self.cur_stream_bytes_avail -= to_copy;
478        to_copy
479    }
480
481    fn serialize_stream_id(
482        &mut self,
483        stream_id: StreamID,
484        output: &mut [u8],
485        output_offset: &mut usize,
486        is_lagging: bool,
487    ) {
488        let buf_entry = &mut self.buf[usize::from(stream_id)];
489        let write_cursor = &mut buf_entry.range.end;
490        let read_cursor = &mut buf_entry.range.start;
491        let buf = &mut buf_entry.mem.slice_mut();
492
493        // find the header and number of bytes that should be written to it
494        let (header, mut n_bytes_to_write) =
495            get_mux_header(stream_id, *write_cursor - *read_cursor, is_lagging);
496        //eprint!("{}) header {:?} bytes: {}\n", stream_id, header, num_bytes_should_write);
497        self.bytes_flushed += n_bytes_to_write;
498        assert!(*read_cursor >= MAX_HEADER_SIZE);
499        match header {
500            MuxSliceHeader::Var(hdr) => {
501                // add on the number of bytes that should be written
502                n_bytes_to_write += hdr.len();
503                // subtract the location of the buffer...this should not bring us below zero
504                *read_cursor -= hdr.len();
505                for i in 0..hdr.len() {
506                    buf[*read_cursor + i] = hdr[i];
507                }
508            }
509            MuxSliceHeader::Fixed(hdr) => {
510                n_bytes_to_write += hdr.len();
511                *read_cursor -= hdr.len();
512                for i in 0..hdr.len() {
513                    buf[*read_cursor + i] = hdr[i];
514                }
515            }
516        }
517        // set bytes_flushed to the end of the desired bytes to flush, so we know this stream isn't lagging too badly
518        self.last_flush[usize::from(stream_id)] = self.bytes_flushed;
519        // compute the number of bytes that will fit into otput
520        let to_write = core::cmp::min(n_bytes_to_write, output.len() - *output_offset);
521        output
522            .split_at_mut(*output_offset)
523            .1
524            .split_at_mut(to_write)
525            .0
526            .clone_from_slice(buf.split_at(*read_cursor).1.split_at(to_write).0);
527        *read_cursor += to_write;
528        // if we have produced everything from this stream, reset the cursors to the beginning to support quick copies
529        if *read_cursor == *write_cursor {
530            *read_cursor = MAX_HEADER_SIZE;
531            *write_cursor = *read_cursor; // reset cursors to the beginning of the buffer
532        }
533        *output_offset += to_write;
534        // we have some leftovers that would not fit into the output buffer..store these for the next serialize_leftovers call
535        if to_write != n_bytes_to_write {
536            self.cur_stream_bytes_avail = n_bytes_to_write - to_write;
537            self.cur_stream = stream_id as StreamID;
538        }
539    }
540
541    fn deserialize_eof(&mut self, mut input: &[u8]) -> usize {
542        let mut ret = 0usize;
543        assert_eq!(EOF_MARKER.len(), 3);
544        match self.stream_state {
545            StreamState::Running => {
546                if input[0] == EOF_MARKER[0] {
547                    ret += 1;
548                    input = input.split_at(1).1;
549                    self.stream_state = StreamState::EofStart;
550                }
551            }
552            _ => {}
553        }
554        if input.len() == 0 {
555            return ret;
556        }
557        match self.stream_state {
558            StreamState::EofStart => {
559                if input[0] == EOF_MARKER[1] {
560                    ret += 1;
561                    input = input.split_at(1).1;
562                    self.stream_state = StreamState::EofMid
563                }
564            }
565            _ => {}
566        }
567        if input.len() == 0 {
568            return ret;
569        }
570        match self.stream_state {
571            StreamState::EofMid => {
572                if input[0] == EOF_MARKER[2] {
573                    ret += 1;
574                    self.stream_state = StreamState::EofDone;
575                    return ret;
576                }
577            }
578            _ => {}
579        }
580        return ret;
581    }
582
583    fn flush_internal(&mut self, output: &mut [u8]) -> usize {
584        let mut output_offset = 0usize;
585        if self.cur_stream_bytes_avail != 0 {
586            output_offset += self.serialize_leftover(output);
587        }
588        while output_offset < output.len() {
589            let mut flushed_any = false;
590            let mut last_flush: Option<usize> = None;
591            for (lf, buf) in self.last_flush.iter().zip(self.buf.iter()) {
592                let rc = buf.range.start;
593                let wc = buf.range.end;
594                if match last_flush {
595                    None => rc != wc, // only consider this item for being the last flush point if it has data to flush
596                    Some(last_flush_some) => *lf < last_flush_some && rc != wc,
597                } {
598                    last_flush = Some(*lf);
599                }
600            }
601            for index in 0..self.n_stream() {
602                if match last_flush {
603                    None => true,
604                    Some(last_flush_some) => {
605                        self.last_flush[index] <= last_flush_some + MAX_FLUSH_VARIANCE
606                    }
607                } {
608                    let mut written = output_offset;
609                    if self.read_cursor(index) != self.write_cursor(index) {
610                        self.serialize_stream_id(index as u8, output, &mut written, true);
611                    }
612                    if written != output_offset {
613                        flushed_any = true;
614                    }
615                    output_offset = written;
616                    if self.cur_stream_bytes_avail != 0 {
617                        break;
618                    }
619                }
620            }
621            if !flushed_any {
622                break;
623            }
624        }
625        output_offset
626    }
627}
628
629pub struct DevNull<AllocU8: Allocator<u8>> {
630    cursor: Vec<usize>,
631    empty: AllocatedMemoryRange<u8, AllocU8>,
632    _placeholder: core::marker::PhantomData<AllocU8>,
633}
634
635impl<AllocU8: Allocator<u8>> DevNull<AllocU8> {
636    pub fn new(n_stream: usize) -> Self {
637        assert!(n_stream <= MAX_NUM_STREAM);
638        DevNull::<AllocU8> {
639            cursor: vec![0; n_stream],
640            empty: AllocatedMemoryRange::default(),
641            _placeholder: core::marker::PhantomData::<AllocU8>::default(),
642        }
643    }
644
645    pub fn n_stream(&self) -> usize {
646        self.cursor.len()
647    }
648}
649
650impl<AllocU8: Allocator<u8>> Default for DevNull<AllocU8> {
651    fn default() -> Self {
652        Self::new(2)
653    }
654}
655
656impl<AllocU8: Allocator<u8>> StreamMuxer<AllocU8> for DevNull<AllocU8> {
657    fn write_buffer(&mut self, stream_id: StreamID, _alloc_u8: &mut AllocU8) -> WritableBytes {
658        WritableBytes {
659            data: &mut [],
660            write_offset: &mut self.cursor[stream_id as usize],
661        }
662    }
663
664    fn write(&mut self, _stream_id: StreamID, data: &[u8], _alloc_u8: &mut AllocU8) -> usize {
665        debug_assert_eq!(data.len(), 0);
666        0
667    }
668
669    fn can_serialize() -> bool {
670        false
671    }
672
673    fn serialize(&mut self, _output: &mut [u8]) -> usize {
674        0
675    }
676
677    fn flush(&mut self, _output: &mut [u8]) -> usize {
678        0
679    }
680
681    fn wrote_eof(&self) -> bool {
682        true
683    }
684
685    fn free(&mut self, _alloc_u8: &mut AllocU8) {}
686
687    fn n_stream(&self) -> usize {
688        self.n_stream()
689    }
690}
691
692impl<AllocU8: Allocator<u8>> StreamDemuxer<AllocU8> for DevNull<AllocU8> {
693    fn deserialize(&mut self, data: &[u8], _alloc_u8: &mut AllocU8) -> usize {
694        debug_assert_eq!(data.len(), 0);
695        0
696    }
697
698    fn read_buffer(&mut self, stream_id: StreamID) -> ReadableBytes {
699        ReadableBytes {
700            data: &[],
701            read_offset: &mut self.cursor[stream_id as usize],
702        }
703    }
704
705    fn data_len(&self, _stream_id: StreamID) -> usize {
706        0
707    }
708
709    fn data(&self, _stream_id: StreamID) -> &[u8] {
710        &[]
711    }
712
713    fn editable_data(&mut self, _stream_id: StreamID) -> &mut AllocatedMemoryRange<u8, AllocU8> {
714        &mut self.empty
715    }
716
717    fn consume_data(&mut self, _stream_id: StreamID, count: usize) {
718        debug_assert_eq!(count, 0);
719    }
720
721    fn consumed_all_streams_until_eof(&self) -> bool {
722        true
723    }
724
725    fn encountered_eof(&self) -> bool {
726        true
727    }
728
729    fn free(&mut self, _alloc_u8: &mut AllocU8) {}
730
731    fn n_stream(&self) -> usize {
732        self.n_stream()
733    }
734}