1use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
2use core;
3
4use interface::{ReadableBytes, StreamDemuxer, StreamID, StreamMuxer, WritableBytes,
5 MAX_NUM_STREAM, STREAM_ID_MASK};
6use util::AllocatedMemoryRange;
7
8pub const EOF_MARKER: [u8; 3] = [0xff, 0xfe, 0xff];
9const MAX_HEADER_SIZE: usize = 3;
10const MAX_FLUSH_VARIANCE: usize = 131073;
11
12enum BytesToDeserialize {
13 None,
14 Some(StreamID, u32),
15 Header0(StreamID),
16 Header1(StreamID, u8),
17}
18
19enum StreamState {
20 Running,
21 EofStart,
22 EofMid,
23 EofDone,
24}
25
26#[derive(Debug)]
27enum MuxSliceHeader {
28 Var([u8; MAX_HEADER_SIZE]),
29 Fixed([u8; 1]),
30}
31
32fn chunk_size(last_flushed: usize, lagging_stream: bool) -> usize {
33 if lagging_stream {
34 return 16;
35 }
36 if last_flushed <= 1024 {
37 return 4096;
38 }
39 if last_flushed <= 65536 {
40 return 16384;
41 }
42 return 65536;
43}
44
45fn get_mux_header(
46 stream_id: StreamID,
47 n_bytes_to_write: usize,
48 is_lagging: bool,
49) -> (MuxSliceHeader, usize) {
50 if is_lagging == false || n_bytes_to_write == 4096 || n_bytes_to_write == 16384
52 || n_bytes_to_write >= 65536
53 {
54 if n_bytes_to_write < 4096 {
55 return get_mux_header(stream_id, n_bytes_to_write, true);
56 }
57 if n_bytes_to_write < 16384 {
58 return (MuxSliceHeader::Fixed([stream_id as u8 | (1 << 4)]), 4096);
60 }
61 if n_bytes_to_write < 65536 {
62 return (MuxSliceHeader::Fixed([stream_id as u8 | (2 << 4)]), 16384);
64 }
65 return (MuxSliceHeader::Fixed([stream_id as u8 | (3 << 4)]), 65536);
67 }
68 assert!(n_bytes_to_write < 65536);
69 let ret = [
71 stream_id,
72 (n_bytes_to_write - 1) as u8,
73 ((n_bytes_to_write - 1) >> 8) as u8,
74 ];
75 return (MuxSliceHeader::Var(ret), n_bytes_to_write);
76}
77
78pub struct Mux<AllocU8: Allocator<u8>> {
79 buf: Vec<AllocatedMemoryRange<u8, AllocU8>>,
80 cur_stream: StreamID,
81 cur_stream_bytes_avail: usize,
82 bytes_flushed: usize,
83 stream_state: StreamState,
86 last_flush: Vec<usize>,
87 bytes_to_deserialize: BytesToDeserialize,
88}
89
90impl<AllocU8: Allocator<u8>> Default for Mux<AllocU8> {
91 fn default() -> Self {
92 Self::new(2)
93 }
94}
95
96impl<AllocU8: Allocator<u8>> StreamMuxer<AllocU8> for Mux<AllocU8> {
97 fn write(&mut self, stream_id: StreamID, data: &[u8], alloc_u8: &mut AllocU8) -> usize {
98 self.push_data(stream_id, data, alloc_u8);
99 data.len()
100 }
101
102 fn write_buffer(&mut self, stream_id: StreamID, alloc_u8: &mut AllocU8) -> WritableBytes {
103 const MIN_BYTES: usize = 16;
104 self.prep_push_for_n_bytes(stream_id, MIN_BYTES, alloc_u8);
105 let buf = &mut self.buf[stream_id as usize];
106 WritableBytes {
107 data: buf.mem.slice_mut(),
108 write_offset: &mut buf.range.end,
109 }
110 }
111
112 fn serialize(&mut self, output: &mut [u8]) -> usize {
113 let mut output_offset = 0usize;
114 if self.cur_stream_bytes_avail != 0 {
115 output_offset += self.serialize_leftover(output);
116 }
117 while output_offset < output.len() {
118 let mut flushed_any = false;
119 let mut min_flush = self.last_flush[0];
120 let mut max_flush = self.last_flush[0];
121 for lf in self.last_flush[1..].iter() {
122 if *lf < min_flush {
123 min_flush = *lf;
124 }
125 if *lf > max_flush {
126 max_flush = *lf;
127 }
128 }
129 for index in 0..self.n_stream() {
130 let mut is_lagging = self.last_flush[index] + MAX_FLUSH_VARIANCE < max_flush;
131 if self.write_cursor(index) - self.read_cursor(index)
132 >= chunk_size(self.last_flush[index], is_lagging)
133 && self.last_flush[index] <= min_flush + MAX_FLUSH_VARIANCE
134 {
135 flushed_any = true;
136 self.serialize_stream_id(
137 index as StreamID,
138 output,
139 &mut output_offset,
140 is_lagging,
141 );
142 if self.cur_stream_bytes_avail != 0 {
143 break;
144 }
145 }
146 }
147 if !flushed_any {
148 break;
149 }
150 }
151 output_offset
152 }
153
154 fn flush(&mut self, output: &mut [u8]) -> usize {
155 match self.stream_state {
156 StreamState::EofDone => return 0,
157 _ => {}
158 }
159 let mut ret = self.flush_internal(output);
160 if ret == output.len() {
161 return ret;
162 }
163 match self.stream_state {
164 StreamState::Running => {
165 output[ret] = EOF_MARKER[0];
166 ret += 1;
167 self.stream_state = StreamState::EofStart;
168 }
169 _ => {}
170 }
171 if ret == output.len() {
172 return ret;
173 }
174 match self.stream_state {
175 StreamState::EofStart => {
176 output[ret] = EOF_MARKER[1];
177 ret += 1;
178 self.stream_state = StreamState::EofMid;
179 }
180 _ => {}
181 }
182 if ret == output.len() {
183 return ret;
184 }
185 match self.stream_state {
186 StreamState::EofMid => {
187 output[ret] = EOF_MARKER[2];
188 ret += 1;
189 self.stream_state = StreamState::EofDone;
190 }
191 _ => {}
192 }
193 return ret;
194 }
195
196 fn wrote_eof(&self) -> bool {
197 self.is_eof()
198 }
199
200 fn free(&mut self, alloc_u8: &mut AllocU8) {
201 self.free(alloc_u8);
202 }
203
204 fn n_stream(&self) -> usize {
205 self.n_stream()
206 }
207}
208
209impl<AllocU8: Allocator<u8>> StreamDemuxer<AllocU8> for Mux<AllocU8> {
210 fn deserialize(&mut self, data: &[u8], alloc_u8: &mut AllocU8) -> usize {
211 let mut input = data;
212 let mut ret = 0usize;
213 while input.len() != 0 && match self.stream_state {
214 StreamState::EofDone => false,
215 _ => true,
216 } {
217 match self.bytes_to_deserialize {
218 BytesToDeserialize::Header0(stream_id) => {
219 self.bytes_to_deserialize = BytesToDeserialize::Header1(stream_id, input[0]);
220 return ret + 1 + self.deserialize(input.split_at(1).1, alloc_u8);
221 }
222 BytesToDeserialize::Header1(stream_id, lsb) => {
223 self.bytes_to_deserialize = BytesToDeserialize::Some(
224 stream_id,
225 (lsb as u32 | (input[0] as u32) << 8) + 1,
226 );
227 return ret + 1 + self.deserialize(input.split_at(1).1, alloc_u8);
230 }
231 BytesToDeserialize::Some(stream_id, count) => {
232 if count as usize > input.len() {
233 self.push_data(stream_id, input, alloc_u8);
234 self.bytes_to_deserialize =
235 BytesToDeserialize::Some(stream_id, count - input.len() as u32);
236 return ret + input.len();
237 }
238 let (to_push, remainder) = input.split_at(count as usize);
239 self.push_data(stream_id, to_push, alloc_u8);
240 input = remainder;
241 self.bytes_to_deserialize = BytesToDeserialize::None;
242 ret += to_push.len();
243 }
244 BytesToDeserialize::None => {
245 if input[0] == EOF_MARKER[0] || input[0] == EOF_MARKER[1]
246 || input[0] == EOF_MARKER[2]
247 {
248 if input[0] == EOF_MARKER[0] || match self.stream_state {
249 StreamState::Running => false,
250 _ => true,
251 } {
252 return ret + self.deserialize_eof(input);
254 }
255 }
256 let stream_id = input[0] & STREAM_ID_MASK;
257 let count: usize;
258 let bytes_to_copy: u32;
259 if input[0] < 16 {
260 if input.len() < 3 {
262 self.bytes_to_deserialize = BytesToDeserialize::Header0(stream_id);
263 return ret + 1 + self.deserialize(input.split_at(1).1, alloc_u8);
264 }
265 count = 3;
266 bytes_to_copy = (input[1] as u32 | (input[2] as u32) << 8) + 1;
267 } else {
269 count = 1;
271 bytes_to_copy = 1024 << ((input[0] >> 4) << 1);
272 }
274 self.bytes_to_deserialize = BytesToDeserialize::Some(stream_id, bytes_to_copy);
276 input = input.split_at(count).1;
277 ret += count;
278 }
279 }
280 }
281 ret
282 }
283
284 fn read_buffer(&mut self, stream_id: StreamID) -> ReadableBytes {
285 let buf = &mut self.buf[stream_id as usize];
286 ReadableBytes {
287 data: buf.mem.slice().split_at(buf.range.end).0,
288 read_offset: &mut buf.range.start,
289 }
290 }
291
292 fn data_len(&self, stream_id: StreamID) -> usize {
293 self.write_cursor(usize::from(stream_id)) - self.read_cursor(usize::from(stream_id))
294 }
295
296 fn data(&self, stream_id: StreamID) -> &[u8] {
297 &self.buf[usize::from(stream_id)].slice()
298 }
299
300 fn editable_data(&mut self, stream_id: StreamID) -> &mut AllocatedMemoryRange<u8, AllocU8> {
301 &mut self.buf[usize::from(stream_id)]
302 }
303
304 fn consume_data(&mut self, stream_id: StreamID, count: usize) {
305 self.buf[usize::from(stream_id)].range.start += count;
306 }
307
308 fn consumed_all_streams_until_eof(&self) -> bool {
309 self.is_eof()
310 }
311
312 fn encountered_eof(&self) -> bool {
313 match self.stream_state {
314 StreamState::EofDone => true,
315 _ => false,
316 }
317 }
318
319 fn free(&mut self, alloc_u8: &mut AllocU8) {
320 self.free(alloc_u8)
321 }
322
323 fn n_stream(&self) -> usize {
324 self.n_stream()
325 }
326}
327
328impl<AllocU8: Allocator<u8>> Mux<AllocU8> {
329 pub fn new(n_stream: usize) -> Self {
330 assert!(n_stream <= MAX_NUM_STREAM);
331 let mut buf = Vec::with_capacity(n_stream);
332 for _ in 0..n_stream {
333 buf.push(AllocatedMemoryRange::default());
334 }
335 Mux::<AllocU8> {
336 buf,
337 cur_stream: 0,
338 cur_stream_bytes_avail: 0,
339 bytes_flushed: 0,
340 last_flush: vec![0; n_stream],
341 stream_state: StreamState::Running,
342 bytes_to_deserialize: BytesToDeserialize::None,
343 }
344 }
345
346 pub fn n_stream(&self) -> usize {
347 self.buf.len()
348 }
349
350 pub fn read_cursor(&self, index: usize) -> usize {
351 self.buf[index].range.start
352 }
353
354 pub fn write_cursor(&self, index: usize) -> usize {
355 self.buf[index].range.end
356 }
357
358 pub fn is_eof(&self) -> bool {
359 for index in 0..self.n_stream() {
360 if self.read_cursor(index) != self.write_cursor(index) {
361 return false;
362 }
363 }
364 match self.stream_state {
365 StreamState::EofDone => true,
366 _ => false,
367 }
368 }
369
370 pub fn prealloc(&mut self, alloc_u8: &mut AllocU8, amount_per_stream: usize) {
371 for buf in self.buf.iter_mut() {
372 assert_eq!(buf.mem.slice().len(), 0);
373 let mfd = core::mem::replace(&mut buf.mem, alloc_u8.alloc_cell(amount_per_stream));
374 alloc_u8.free_cell(mfd);
375 }
376 }
377
378 pub fn free(&mut self, alloc_u8: &mut AllocU8) {
379 for buf in self.buf.iter_mut() {
380 alloc_u8.free_cell(core::mem::replace(
381 &mut buf.mem,
382 AllocU8::AllocatedMemory::default(),
383 ));
384 }
385 }
386
387 pub fn push_data(&mut self, stream_id: StreamID, data: &[u8], alloc_u8: &mut AllocU8) {
391 let (buf, offset) = self.prep_push_for_n_bytes(stream_id, data.len(), alloc_u8);
392 Self::unchecked_push(buf.slice_mut(), offset, data)
393 }
394
395 fn unchecked_push(buf: &mut [u8], write_cursor: &mut usize, data: &[u8]) {
396 buf.split_at_mut(*write_cursor)
397 .1
398 .split_at_mut(data.len())
399 .0
400 .clone_from_slice(data);
401 *write_cursor += data.len();
402 }
403
404 fn prep_push_for_n_bytes(
405 &mut self,
406 stream_id: StreamID,
407 data_len: usize,
408 alloc_u8: &mut AllocU8,
409 ) -> (&mut AllocU8::AllocatedMemory, &mut usize) {
410 let buf_entry = &mut self.buf[usize::from(stream_id)];
412 let write_cursor = &mut buf_entry.range.end;
413 let read_cursor = &mut buf_entry.range.start;
414 let buf = &mut buf_entry.mem;
415 if buf.slice().len() - *write_cursor >= data_len {
417 return (buf, write_cursor);
418 }
419 if buf.slice().len() >= (*write_cursor - *read_cursor) + data_len + MAX_HEADER_SIZE
421 && (*read_cursor == *write_cursor
422 || (*read_cursor >= 16384
423 && *read_cursor > *write_cursor - *read_cursor + MAX_HEADER_SIZE))
424 {
425 {
426 let (unbuffered_empty_half, full_half) = buf.slice_mut().split_at_mut(*read_cursor);
427 let empty_half = unbuffered_empty_half.split_at_mut(MAX_HEADER_SIZE).1; let amount_of_data_to_copy = *write_cursor - *read_cursor;
429 empty_half
430 .split_at_mut(amount_of_data_to_copy)
431 .0
432 .clone_from_slice(full_half.split_at(amount_of_data_to_copy).0);
433 *write_cursor = MAX_HEADER_SIZE + amount_of_data_to_copy;
434 *read_cursor = MAX_HEADER_SIZE;
435 }
436 return (buf, write_cursor);
437 }
438 let desired_size: u64 =
440 (MAX_HEADER_SIZE + data_len + (*write_cursor - *read_cursor)) as u64;
441 let log_desired_size = (64 - desired_size.leading_zeros()) + 1;
442 let mut new_buf = alloc_u8.alloc_cell(1 << core::cmp::max(log_desired_size, 9));
444 debug_assert!(new_buf.slice().len() >= *write_cursor - *read_cursor + data_len);
445 new_buf
446 .slice_mut()
447 .split_at_mut(MAX_HEADER_SIZE)
448 .1
449 .split_at_mut(*write_cursor - *read_cursor)
450 .0
451 .clone_from_slice(
452 buf.slice()
453 .split_at(*read_cursor)
454 .1
455 .split_at(*write_cursor - *read_cursor)
456 .0,
457 );
458 *write_cursor = MAX_HEADER_SIZE + *write_cursor - *read_cursor;
459 *read_cursor = MAX_HEADER_SIZE;
460 alloc_u8.free_cell(core::mem::replace(buf, new_buf));
461 (buf, write_cursor)
462 }
463
464 fn serialize_leftover(&mut self, output: &mut [u8]) -> usize {
466 let to_copy = core::cmp::min(self.cur_stream_bytes_avail, output.len());
467 output.split_at_mut(to_copy).0.clone_from_slice(
468 self.buf[usize::from(self.cur_stream)]
469 .mem
470 .slice()
471 .split_at(self.read_cursor(usize::from(self.cur_stream)))
472 .1
473 .split_at(to_copy)
474 .0,
475 );
476 self.buf[usize::from(self.cur_stream)].range.start += to_copy;
477 self.cur_stream_bytes_avail -= to_copy;
478 to_copy
479 }
480
481 fn serialize_stream_id(
482 &mut self,
483 stream_id: StreamID,
484 output: &mut [u8],
485 output_offset: &mut usize,
486 is_lagging: bool,
487 ) {
488 let buf_entry = &mut self.buf[usize::from(stream_id)];
489 let write_cursor = &mut buf_entry.range.end;
490 let read_cursor = &mut buf_entry.range.start;
491 let buf = &mut buf_entry.mem.slice_mut();
492
493 let (header, mut n_bytes_to_write) =
495 get_mux_header(stream_id, *write_cursor - *read_cursor, is_lagging);
496 self.bytes_flushed += n_bytes_to_write;
498 assert!(*read_cursor >= MAX_HEADER_SIZE);
499 match header {
500 MuxSliceHeader::Var(hdr) => {
501 n_bytes_to_write += hdr.len();
503 *read_cursor -= hdr.len();
505 for i in 0..hdr.len() {
506 buf[*read_cursor + i] = hdr[i];
507 }
508 }
509 MuxSliceHeader::Fixed(hdr) => {
510 n_bytes_to_write += hdr.len();
511 *read_cursor -= hdr.len();
512 for i in 0..hdr.len() {
513 buf[*read_cursor + i] = hdr[i];
514 }
515 }
516 }
517 self.last_flush[usize::from(stream_id)] = self.bytes_flushed;
519 let to_write = core::cmp::min(n_bytes_to_write, output.len() - *output_offset);
521 output
522 .split_at_mut(*output_offset)
523 .1
524 .split_at_mut(to_write)
525 .0
526 .clone_from_slice(buf.split_at(*read_cursor).1.split_at(to_write).0);
527 *read_cursor += to_write;
528 if *read_cursor == *write_cursor {
530 *read_cursor = MAX_HEADER_SIZE;
531 *write_cursor = *read_cursor; }
533 *output_offset += to_write;
534 if to_write != n_bytes_to_write {
536 self.cur_stream_bytes_avail = n_bytes_to_write - to_write;
537 self.cur_stream = stream_id as StreamID;
538 }
539 }
540
541 fn deserialize_eof(&mut self, mut input: &[u8]) -> usize {
542 let mut ret = 0usize;
543 assert_eq!(EOF_MARKER.len(), 3);
544 match self.stream_state {
545 StreamState::Running => {
546 if input[0] == EOF_MARKER[0] {
547 ret += 1;
548 input = input.split_at(1).1;
549 self.stream_state = StreamState::EofStart;
550 }
551 }
552 _ => {}
553 }
554 if input.len() == 0 {
555 return ret;
556 }
557 match self.stream_state {
558 StreamState::EofStart => {
559 if input[0] == EOF_MARKER[1] {
560 ret += 1;
561 input = input.split_at(1).1;
562 self.stream_state = StreamState::EofMid
563 }
564 }
565 _ => {}
566 }
567 if input.len() == 0 {
568 return ret;
569 }
570 match self.stream_state {
571 StreamState::EofMid => {
572 if input[0] == EOF_MARKER[2] {
573 ret += 1;
574 self.stream_state = StreamState::EofDone;
575 return ret;
576 }
577 }
578 _ => {}
579 }
580 return ret;
581 }
582
583 fn flush_internal(&mut self, output: &mut [u8]) -> usize {
584 let mut output_offset = 0usize;
585 if self.cur_stream_bytes_avail != 0 {
586 output_offset += self.serialize_leftover(output);
587 }
588 while output_offset < output.len() {
589 let mut flushed_any = false;
590 let mut last_flush: Option<usize> = None;
591 for (lf, buf) in self.last_flush.iter().zip(self.buf.iter()) {
592 let rc = buf.range.start;
593 let wc = buf.range.end;
594 if match last_flush {
595 None => rc != wc, Some(last_flush_some) => *lf < last_flush_some && rc != wc,
597 } {
598 last_flush = Some(*lf);
599 }
600 }
601 for index in 0..self.n_stream() {
602 if match last_flush {
603 None => true,
604 Some(last_flush_some) => {
605 self.last_flush[index] <= last_flush_some + MAX_FLUSH_VARIANCE
606 }
607 } {
608 let mut written = output_offset;
609 if self.read_cursor(index) != self.write_cursor(index) {
610 self.serialize_stream_id(index as u8, output, &mut written, true);
611 }
612 if written != output_offset {
613 flushed_any = true;
614 }
615 output_offset = written;
616 if self.cur_stream_bytes_avail != 0 {
617 break;
618 }
619 }
620 }
621 if !flushed_any {
622 break;
623 }
624 }
625 output_offset
626 }
627}
628
629pub struct DevNull<AllocU8: Allocator<u8>> {
630 cursor: Vec<usize>,
631 empty: AllocatedMemoryRange<u8, AllocU8>,
632 _placeholder: core::marker::PhantomData<AllocU8>,
633}
634
635impl<AllocU8: Allocator<u8>> DevNull<AllocU8> {
636 pub fn new(n_stream: usize) -> Self {
637 assert!(n_stream <= MAX_NUM_STREAM);
638 DevNull::<AllocU8> {
639 cursor: vec![0; n_stream],
640 empty: AllocatedMemoryRange::default(),
641 _placeholder: core::marker::PhantomData::<AllocU8>::default(),
642 }
643 }
644
645 pub fn n_stream(&self) -> usize {
646 self.cursor.len()
647 }
648}
649
650impl<AllocU8: Allocator<u8>> Default for DevNull<AllocU8> {
651 fn default() -> Self {
652 Self::new(2)
653 }
654}
655
656impl<AllocU8: Allocator<u8>> StreamMuxer<AllocU8> for DevNull<AllocU8> {
657 fn write_buffer(&mut self, stream_id: StreamID, _alloc_u8: &mut AllocU8) -> WritableBytes {
658 WritableBytes {
659 data: &mut [],
660 write_offset: &mut self.cursor[stream_id as usize],
661 }
662 }
663
664 fn write(&mut self, _stream_id: StreamID, data: &[u8], _alloc_u8: &mut AllocU8) -> usize {
665 debug_assert_eq!(data.len(), 0);
666 0
667 }
668
669 fn can_serialize() -> bool {
670 false
671 }
672
673 fn serialize(&mut self, _output: &mut [u8]) -> usize {
674 0
675 }
676
677 fn flush(&mut self, _output: &mut [u8]) -> usize {
678 0
679 }
680
681 fn wrote_eof(&self) -> bool {
682 true
683 }
684
685 fn free(&mut self, _alloc_u8: &mut AllocU8) {}
686
687 fn n_stream(&self) -> usize {
688 self.n_stream()
689 }
690}
691
692impl<AllocU8: Allocator<u8>> StreamDemuxer<AllocU8> for DevNull<AllocU8> {
693 fn deserialize(&mut self, data: &[u8], _alloc_u8: &mut AllocU8) -> usize {
694 debug_assert_eq!(data.len(), 0);
695 0
696 }
697
698 fn read_buffer(&mut self, stream_id: StreamID) -> ReadableBytes {
699 ReadableBytes {
700 data: &[],
701 read_offset: &mut self.cursor[stream_id as usize],
702 }
703 }
704
705 fn data_len(&self, _stream_id: StreamID) -> usize {
706 0
707 }
708
709 fn data(&self, _stream_id: StreamID) -> &[u8] {
710 &[]
711 }
712
713 fn editable_data(&mut self, _stream_id: StreamID) -> &mut AllocatedMemoryRange<u8, AllocU8> {
714 &mut self.empty
715 }
716
717 fn consume_data(&mut self, _stream_id: StreamID, count: usize) {
718 debug_assert_eq!(count, 0);
719 }
720
721 fn consumed_all_streams_until_eof(&self) -> bool {
722 true
723 }
724
725 fn encountered_eof(&self) -> bool {
726 true
727 }
728
729 fn free(&mut self, _alloc_u8: &mut AllocU8) {}
730
731 fn n_stream(&self) -> usize {
732 self.n_stream()
733 }
734}