1use std::{borrow::Borrow, cmp, collections::VecDeque, fmt, io, mem, ops, ptr};
2
3use crate::{BufMut, BytePageSize, ByteString, Bytes, BytesMut};
4use crate::{buf::UninitSlice, storage::StorageVec};
5
6pub struct BytePages {
7 size: BytePageSize,
8 pages: VecDeque<BytePage>,
9 current: StorageVec,
10}
11
12impl BytePages {
13 pub fn new(size: BytePageSize) -> Self {
18 debug_assert!(size != BytePageSize::Unset, "Page cannot be Unset");
19
20 BytePages {
21 size,
22 pages: VecDeque::with_capacity(8),
23 current: StorageVec::sized(size),
24 }
25 }
26
27 pub fn page_size(&self) -> BytePageSize {
29 self.size
30 }
31
32 pub fn set_page_size(&mut self, size: BytePageSize) {
34 self.size = size;
35 }
36
37 pub fn prepend<T>(&mut self, buf: T) -> bool
39 where
40 BytePage: From<T>,
41 {
42 let p = BytePage::from(buf);
43 if p.is_empty() {
44 false
45 } else {
46 self.pages.push_front(p);
47 true
48 }
49 }
50
51 pub fn append<T>(&mut self, buf: T)
53 where
54 BytePage: From<T>,
55 {
56 let p = BytePage::from(buf);
57 let remaining = self.current.remaining();
58
59 if p.len() <= remaining {
60 self.put_slice(p.as_ref());
61 } else if self.current.len() == 0 {
62 match p.into_storage() {
63 Ok(st) => {
64 self.current = st;
65 }
66 Err(page) => {
67 self.pages.push_back(page);
69 }
70 }
71 } else {
72 self.pages.push_back(BytePage {
74 inner: StorageType::Storage(mem::replace(
75 &mut self.current,
76 StorageVec::sized(self.size),
77 )),
78 });
79
80 self.pages.push_back(p);
82 }
83 }
84
85 #[inline]
86 pub fn extend_from_slice(&mut self, extend: &[u8]) {
92 self.put_slice(extend);
93 }
94
95 #[inline]
96 pub fn len(&self) -> usize {
98 self.pages
99 .iter()
100 .fold(self.current.len(), |c, page| c + page.len())
101 }
102
103 #[inline]
104 pub fn is_empty(&self) -> bool {
106 for p in &self.pages {
107 if !p.is_empty() {
108 return false;
109 }
110 }
111 self.current.len() == 0
112 }
113
114 #[inline]
115 pub fn num_pages(&self) -> usize {
117 if self.current.len() == 0 {
118 self.pages.len()
119 } else {
120 self.pages.len() + 1
121 }
122 }
123
124 pub fn take(&mut self) -> Option<BytePage> {
126 if let Some(page) = self.pages.pop_front() {
127 Some(page)
128 } else if self.current.len() == 0 {
129 None
130 } else {
131 Some(BytePage::from(mem::replace(
132 &mut self.current,
133 StorageVec::sized(self.size),
134 )))
135 }
136 }
137
138 #[inline]
139 pub fn copy_to(&self, pages: &mut BytePages) {
144 for p in &self.pages {
145 pages.append(p.clone());
146 }
147
148 if self.current.len() != 0 {
149 pages.append(BytePage::from(Bytes::copy_from_slice(
150 self.current.as_ref(),
151 )));
152 }
153 }
154
155 #[inline]
156 pub fn move_to(&mut self, pages: &mut BytePages) {
158 while let Some(page) = self.take() {
159 pages.append(page);
160 }
161 }
162
163 #[must_use]
171 pub fn split_to(&mut self, mut at: usize) -> BytePages {
172 let mut pages = BytePages::new(self.size);
173
174 while let Some(mut page) = self.pages.pop_front() {
175 let len = cmp::min(page.len(), at);
176 pages.append(page.split_to(len));
177
178 if !page.is_empty() {
179 self.pages.push_front(page);
180 return pages;
181 }
182 at -= len;
183 }
184
185 if at > 0
186 && let Some(mut page) = self.take()
187 {
188 let len = cmp::min(page.len(), at);
189 pages.append(page.split_to(len));
190 self.append(page);
191 }
192 pages
193 }
194
195 #[inline]
197 #[must_use]
198 pub fn freeze(&mut self) -> Bytes {
199 let mut buf = BytesMut::with_capacity(self.len());
200 while let Some(p) = self.take() {
201 buf.extend_from_slice(&p);
202 }
203 buf.freeze()
204 }
205
206 #[inline]
207 pub fn try_get_current_from(&mut self, pages: &mut BytePages) {
208 if self.pages.is_empty() && self.current.len() == 0 && pages.current.len() != 0 {
209 self.current = mem::replace(&mut pages.current, StorageVec::sized(self.size));
210 }
211 }
212}
213
214impl fmt::Debug for BytePages {
215 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
216 let mut f = fmt.debug_tuple("BytePages");
217 for p in &self.pages {
218 f.field(p);
219 }
220 if self.current.len() != 0 {
221 f.field(&crate::debug::BsDebug(self.current.as_ref()));
222 }
223 f.finish()
224 }
225}
226
227impl Default for BytePages {
228 fn default() -> Self {
229 BytePages::new(BytePageSize::Size16)
230 }
231}
232
233impl BufMut for BytePages {
234 #[inline]
235 fn remaining_mut(&self) -> usize {
236 self.current.remaining()
237 }
238
239 #[inline]
240 unsafe fn advance_mut(&mut self, cnt: usize) {
241 self.current.set_len(self.current.len() + cnt);
243 }
244
245 #[inline]
246 fn chunk_mut(&mut self) -> &mut UninitSlice {
247 unsafe {
248 let ptr = &mut self.current.as_ptr();
250 UninitSlice::from_raw_parts_mut(
251 ptr.add(self.current.len()),
252 self.remaining_mut(),
253 )
254 }
255 }
256
257 fn put_slice(&mut self, mut src: &[u8]) {
258 while !src.is_empty() {
259 let amount = cmp::min(src.len(), self.current.remaining());
260 unsafe {
261 ptr::copy_nonoverlapping(
262 src.as_ptr(),
263 self.chunk_mut().as_mut_ptr(),
264 amount,
265 );
266 self.advance_mut(amount);
267 }
268 src = &src[amount..];
269
270 if self.current.is_full() {
272 self.pages.push_back(BytePage::from(mem::replace(
273 &mut self.current,
274 StorageVec::sized(self.size),
275 )));
276 }
277 }
278 }
279
280 #[inline]
281 fn put_u8(&mut self, n: u8) {
282 self.current.put_u8(n);
283 if self.current.is_full() {
284 self.pages.push_back(BytePage::from(mem::replace(
285 &mut self.current,
286 StorageVec::sized(self.size),
287 )));
288 }
289 }
290
291 #[inline]
292 fn put_i8(&mut self, n: i8) {
293 self.put_u8(n as u8);
294 }
295}
296
297impl Clone for BytePages {
298 fn clone(&self) -> Self {
299 let mut pages = BytePages::new(self.size);
300 self.copy_to(&mut pages);
301 pages
302 }
303}
304
305impl io::Write for BytePages {
306 fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
307 self.put_slice(src);
308 Ok(src.len())
309 }
310
311 fn flush(&mut self) -> Result<(), io::Error> {
312 Ok(())
313 }
314}
315
316impl From<BytePages> for Bytes {
317 fn from(pages: BytePages) -> Bytes {
318 BytesMut::from(pages).freeze()
319 }
320}
321
322impl From<BytePages> for BytesMut {
323 fn from(mut pages: BytePages) -> BytesMut {
324 let mut buf = BytesMut::with_capacity(pages.len());
325 while let Some(p) = pages.take() {
326 buf.extend_from_slice(&p);
327 }
328 buf
329 }
330}
331
332pub struct BytePage {
333 inner: StorageType,
334}
335
336enum StorageType {
337 Bytes(Bytes),
338 Storage(StorageVec),
339 Vec(Vec<u8>),
340}
341
342impl BytePage {
343 #[inline]
344 pub fn len(&self) -> usize {
346 match &self.inner {
347 StorageType::Bytes(b) => b.len(),
348 StorageType::Storage(b) => b.len(),
349 StorageType::Vec(b) => b.len(),
350 }
351 }
352
353 #[inline]
354 pub fn is_empty(&self) -> bool {
356 match &self.inner {
357 StorageType::Bytes(b) => b.is_empty(),
358 StorageType::Storage(b) => b.len() == 0,
359 StorageType::Vec(b) => b.is_empty(),
360 }
361 }
362
363 pub fn as_ptr(&self) -> *const u8 {
365 unsafe {
366 match &self.inner {
367 StorageType::Bytes(b) => b.storage.as_ptr(),
368 StorageType::Storage(b) => b.as_ptr(),
369 StorageType::Vec(b) => b.as_ptr(),
370 }
371 }
372 }
373
374 #[must_use]
382 pub fn split_to(&mut self, at: usize) -> BytePage {
383 match &mut self.inner {
384 StorageType::Bytes(b) => {
385 let buf = b.split_to(cmp::min(at, b.len()));
386 BytePage {
387 inner: StorageType::Bytes(buf),
388 }
389 }
390 StorageType::Storage(_) => {
391 let inner = mem::replace(&mut self.inner, StorageType::Bytes(Bytes::new()));
392 if let StorageType::Storage(st) = inner {
393 self.inner = StorageType::Bytes(Bytes {
394 storage: st.freeze(),
395 });
396 self.split_to(at)
397 } else {
398 unreachable!()
399 }
400 }
401 StorageType::Vec(_) => {
402 let inner = mem::replace(&mut self.inner, StorageType::Bytes(Bytes::new()));
403 if let StorageType::Vec(b) = inner {
404 self.inner = StorageType::Bytes(Bytes::copy_from_slice(&b));
405 self.split_to(at)
406 } else {
407 unreachable!()
408 }
409 }
410 }
411 }
412
413 #[inline]
422 pub fn advance_to(&mut self, cnt: usize) {
423 match &mut self.inner {
424 StorageType::Bytes(b) => b.advance_to(cnt),
425 StorageType::Storage(b) => unsafe { b.set_start(cnt as u32) },
426 StorageType::Vec(b) => {
427 self.inner = StorageType::Bytes(Bytes::copy_from_slice(&b[cnt..]));
428 }
429 }
430 }
431
432 #[inline]
434 #[must_use]
435 pub fn freeze(self) -> Bytes {
436 match self.inner {
437 StorageType::Bytes(b) => b,
438 StorageType::Storage(st) => Bytes {
439 storage: st.freeze(),
440 },
441 StorageType::Vec(v) => Bytes::from(v),
442 }
443 }
444
445 fn into_storage(self) -> Result<StorageVec, Self> {
446 if let StorageType::Storage(mut st) = self.inner {
447 if !st.is_full() && st.is_unique() {
449 Ok(st)
450 } else {
451 Err(Self {
452 inner: StorageType::Storage(st),
453 })
454 }
455 } else {
456 Err(self)
457 }
458 }
459}
460
461impl Clone for BytePage {
462 fn clone(&self) -> Self {
463 let inner = match &self.inner {
464 StorageType::Bytes(b) => StorageType::Bytes(b.clone()),
465 StorageType::Storage(st) => {
466 StorageType::Storage(unsafe { st.clone() })
469 }
470 StorageType::Vec(b) => StorageType::Bytes(Bytes::copy_from_slice(b)),
471 };
472
473 Self { inner }
474 }
475}
476
477impl AsRef<[u8]> for BytePage {
478 #[inline]
479 fn as_ref(&self) -> &[u8] {
480 match &self.inner {
481 StorageType::Bytes(b) => b.as_ref(),
482 StorageType::Storage(b) => b.as_ref(),
483 StorageType::Vec(b) => b.as_ref(),
484 }
485 }
486}
487
488impl Borrow<[u8]> for BytePage {
489 #[inline]
490 fn borrow(&self) -> &[u8] {
491 self.as_ref()
492 }
493}
494
495impl From<Bytes> for BytePage {
496 fn from(buf: Bytes) -> Self {
497 BytePage {
498 inner: StorageType::Bytes(buf),
499 }
500 }
501}
502
503impl<'a> From<&'a Bytes> for BytePage {
504 fn from(buf: &'a Bytes) -> Self {
505 BytePage {
506 inner: StorageType::Bytes(buf.clone()),
507 }
508 }
509}
510
511impl From<BytesMut> for BytePage {
512 fn from(buf: BytesMut) -> Self {
513 BytePage {
514 inner: StorageType::Storage(buf.storage),
515 }
516 }
517}
518
519impl From<ByteString> for BytePage {
520 fn from(s: ByteString) -> Self {
521 s.into_bytes().into()
522 }
523}
524
525impl<'a> From<&'a ByteString> for BytePage {
526 fn from(s: &'a ByteString) -> Self {
527 s.clone().into_bytes().into()
528 }
529}
530
531impl From<StorageVec> for BytePage {
532 fn from(buf: StorageVec) -> Self {
533 BytePage {
534 inner: StorageType::Storage(buf),
535 }
536 }
537}
538
539impl From<Vec<u8>> for BytePage {
540 fn from(buf: Vec<u8>) -> Self {
541 BytePage {
542 inner: StorageType::Vec(buf),
543 }
544 }
545}
546
547impl From<&'static str> for BytePage {
548 fn from(buf: &'static str) -> Self {
549 BytePage::from(Bytes::from_static(buf.as_bytes()))
550 }
551}
552
553impl From<&'static [u8]> for BytePage {
554 fn from(buf: &'static [u8]) -> Self {
555 BytePage::from(Bytes::from_static(buf))
556 }
557}
558
559impl<const N: usize> From<&'static [u8; N]> for BytePage {
560 fn from(src: &'static [u8; N]) -> Self {
561 BytePage::from(Bytes::from_static(src))
562 }
563}
564
565impl From<BytePage> for Bytes {
566 fn from(page: BytePage) -> Self {
567 match page.inner {
568 StorageType::Bytes(b) => b,
569 StorageType::Storage(storage) => BytesMut { storage }.freeze(),
570 StorageType::Vec(v) => Bytes::copy_from_slice(&v),
571 }
572 }
573}
574
575impl From<BytePage> for BytesMut {
576 fn from(page: BytePage) -> Self {
577 match page.inner {
578 StorageType::Bytes(b) => b.into(),
579 StorageType::Storage(storage) => BytesMut { storage },
580 StorageType::Vec(v) => BytesMut::copy_from_slice(&v),
581 }
582 }
583}
584
585impl PartialEq for BytePage {
586 fn eq(&self, other: &BytePage) -> bool {
587 self.as_ref() == other.as_ref()
588 }
589}
590
591impl<'a> PartialEq<&'a [u8]> for BytePage {
592 fn eq(&self, other: &&'a [u8]) -> bool {
593 self.as_ref() == *other
594 }
595}
596
597impl<'a, const N: usize> PartialEq<&'a [u8; N]> for BytePage {
598 fn eq(&self, other: &&'a [u8; N]) -> bool {
599 self.as_ref() == other.as_ref()
600 }
601}
602
603impl io::Read for BytePage {
604 fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
605 let len = cmp::min(self.len(), dst.len());
606 if len > 0 {
607 dst[..len].copy_from_slice(&self[..len]);
608 self.advance_to(len);
609 }
610 Ok(len)
611 }
612}
613
614impl ops::Deref for BytePage {
615 type Target = [u8];
616
617 #[inline]
618 fn deref(&self) -> &[u8] {
619 self.as_ref()
620 }
621}
622
623impl fmt::Debug for BytePage {
624 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
625 fmt::Debug::fmt(&crate::debug::BsDebug(self.as_ref()), fmt)
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632
633 #[test]
634 fn pages() {
635 let mut pages = BytePages::new(BytePageSize::Size8);
637 assert!(pages.is_empty());
638 assert_eq!(pages.len(), 0);
639 assert_eq!(pages.num_pages(), 0);
640 pages.extend_from_slice(b"b");
641 assert_eq!(pages.len(), 1);
642 assert_eq!(pages.num_pages(), 1);
643 pages.extend_from_slice("a".repeat(9 * 1024).as_bytes());
644 assert_eq!(pages.len(), 9217);
645 assert_eq!(pages.num_pages(), 2);
646 assert!(!pages.is_empty());
647
648 let mut pgs = BytePages::new(BytePageSize::Size8);
649 pgs.put_i8(b'a' as i8);
650 let p = pgs.take().unwrap();
651 assert_eq!(p.len(), 1);
652 assert_eq!(p.as_ref(), b"a");
653
654 pgs.extend_from_slice("a".repeat(8 * 1024 - 1).as_bytes());
655 assert_eq!(pgs.num_pages(), 1);
656 pgs.put_u8(b'a');
657 assert_eq!(pgs.num_pages(), 1);
658 assert_eq!(pgs.current.len(), 0);
659
660 pgs.put_u8(b'a');
661 assert_eq!(pgs.num_pages(), 2);
662
663 pgs.append(Bytes::copy_from_slice("a".repeat(8 * 1024).as_bytes()));
664 assert_eq!(pgs.num_pages(), 3);
665 assert_eq!(pgs.current.len(), 0);
666
667 let p = pages.take().unwrap();
669 assert_eq!(p.len(), 8192);
670 let p = pages.take().unwrap();
671 assert_eq!(p.len(), 1025);
672 assert!(!p.is_empty());
673 assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
674 assert_eq!(p.as_ref(), "a".repeat(1025).as_bytes());
675 assert!(pages.take().is_none());
676
677 let p = BytePage::from(Bytes::copy_from_slice(b"123"));
678 assert_eq!(p.len(), 3);
679 assert!(!p.is_empty());
680 assert_eq!(p.as_ref(), b"123");
681 assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
682
683 let p = BytePage::from(&b"123"[..]);
684 assert_eq!(p.len(), 3);
685 assert!(!p.is_empty());
686 assert_eq!(p.as_ref(), b"123");
687 assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
688
689 let p = BytePage::from(b"123");
690 assert_eq!(p.len(), 3);
691 assert!(!p.is_empty());
692 assert_eq!(p.as_ref(), b"123");
693 assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
694
695 let p = BytePage::from("123");
696 assert_eq!(p.len(), 3);
697 assert!(!p.is_empty());
698 assert_eq!(p.as_ref(), b"123");
699 assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
700 assert_eq!(p.freeze(), b"123");
701
702 let p = BytePage::from(vec![b'1', b'2', b'3']);
703 assert_eq!(p.len(), 3);
704 assert!(!p.is_empty());
705 assert_eq!(p.as_ref(), b"123");
706 assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
707 assert_eq!(p.freeze(), b"123");
708
709 let mut p = BytePage::from(vec![b'1', b'2', b'3']);
710 p.advance_to(1);
711 assert_eq!(p.len(), 2);
712 assert!(!p.is_empty());
713 assert_eq!(p.as_ref(), b"23");
714
715 let mut pages = BytePages::new(BytePageSize::Size8);
717 pages.extend_from_slice(b"b");
718 assert_eq!(format!("{pages:?}"), "BytePages(b\"b\")");
719 let p = pages.take().unwrap();
720 assert_eq!(p.as_ref(), b"b");
721
722 let mut pages = BytePages::new(BytePageSize::Size8);
723 pages.extend_from_slice(b"a");
724 pages.append(Bytes::copy_from_slice(b"123"));
725 pages.pages.push_back(p);
726 assert_eq!(format!("{pages:?}"), "BytePages(b\"b\", b\"a123\")");
727 }
728
729 #[test]
730 fn pages_copy_to() {
731 let mut pages = BytePages::default();
732 let mut pages2 = BytePages::default();
733 pages.put_slice(b"456");
734 pages.prepend(BytePage::from(Bytes::copy_from_slice(b"123")));
735 pages.copy_to(&mut pages2);
736 let p = pages.freeze();
737 assert_eq!(p, b"123456");
738 let p2 = pages2.freeze();
739 assert_eq!(p2, b"123456");
740
741 let mut pages = BytePages::default();
742 let mut pages2 = BytePages::default();
743 pages.put_slice(b"456");
744 pages.prepend(BytePage::from(Bytes::copy_from_slice(b"123")));
745 pages.copy_to(&mut pages2);
746 pages.put_u8(b'7');
747 let p = pages.freeze();
748 assert_eq!(p, b"1234567");
749 let p2 = pages2.freeze();
750 assert_eq!(p2, b"123456");
751
752 let mut pages = BytePages::default();
753 pages.put_slice(b"456");
754 pages.prepend(BytePage::from(Bytes::copy_from_slice(b"123")));
755 let mut pages2 = pages.clone();
756 pages.put_u8(b'7');
757 let p = pages.freeze();
758 assert_eq!(p, b"1234567");
759 let p2 = pages2.freeze();
760 assert_eq!(p2, b"123456");
761 }
762
763 #[test]
764 fn pages_split_to() {
765 let mut pages = BytePages::default();
766 pages.put_slice(b"456");
767 pages.prepend(BytePage::from(Bytes::copy_from_slice(b"123")));
768 let mut pages2 = pages.split_to(1);
769 let p = pages.freeze();
770 assert_eq!(p, b"23456");
771 let p2 = pages2.freeze();
772 assert_eq!(p2, b"1");
773
774 let mut pages = BytePages::default();
775 pages.put_slice(b"456");
776 pages.prepend(BytePage::from(Bytes::copy_from_slice(b"123")));
777 let mut pages2 = pages.split_to(4);
778 let p = pages.freeze();
779 assert_eq!(p, b"56");
780 let p2 = pages2.freeze();
781 assert_eq!(p2, b"1234");
782 }
783
784 #[test]
785 fn page_clone() {
786 let p = BytePage::from(Bytes::copy_from_slice(b"123"));
788 let p2 = p.clone();
789 assert_eq!(p, p2);
790
791 let mut p = BytePage::from(BytesMut::copy_from_slice(b"123"));
793 if let StorageType::Storage(ref mut st) = p.inner {
794 assert!(st.is_unique());
795 } else {
796 panic!()
797 }
798 let p2 = p.clone();
799 assert_eq!(p, p2);
800 if let StorageType::Storage(mut st) = p.inner {
801 assert!(!st.is_unique());
802 } else {
803 panic!()
804 }
805
806 let p = BytePage::from(vec![b'1', b'2', b'3']);
808 let p2 = p.clone();
809 assert_eq!(p, p2);
810 if let StorageType::Bytes(_) = p2.inner {
811 } else {
812 panic!()
813 }
814 }
815
816 #[test]
817 fn page_split_to() {
818 let mut p = BytePage::from(Bytes::copy_from_slice(b"123"));
820 let p2 = p.split_to(1);
821 assert_eq!(p, b"23");
822 assert_eq!(p2, b"1");
823
824 let mut p = BytePage::from(BytesMut::copy_from_slice(b"123"));
826 let p2 = p.split_to(1);
827 assert_eq!(p, b"23");
828 assert_eq!(p2, b"1");
829
830 let mut p = BytePage::from(vec![b'1', b'2', b'3']);
832 let p2 = p.split_to(1);
833 assert_eq!(p, b"23");
834 assert_eq!(p2, b"1");
835 }
836
837 #[test]
838 fn page_read() {
839 use std::io::Read;
840
841 let mut page = BytePage::from(Bytes::copy_from_slice(b"123"));
842
843 let mut buf = [0; 10];
844 assert_eq!(page.read(&mut buf).unwrap(), 3);
845 assert_eq!(page.len(), 0);
846 assert_eq!(buf, [49, 50, 51, 0, 0, 0, 0, 0, 0, 0]);
847 }
848}