1#![allow(clippy::precedence, clippy::verbose_bit_mask)]
2
3use crate::storage::{FileLoad, SyncableFile};
53
54use super::util::{self, calculate_width};
55use byteorder::{BigEndian, ByteOrder};
56use bytes::{Buf, BufMut, Bytes, BytesMut};
57use futures::stream::{Stream, StreamExt};
58use std::{cmp::Ordering, convert::TryFrom, error, fmt, io};
59use tokio::io::{AsyncReadExt, AsyncWriteExt};
60use tokio_util::codec::{Decoder, FramedRead};
61
62use itertools::Itertools;
63
64const _: usize = 0 - !(std::mem::size_of::<usize>() >= 32 >> 3) as usize;
67
68#[derive(Clone)]
70pub struct LogArray {
71 first: u64,
76
77 len: u64,
82
83 width: u8,
85
86 input_buf: Bytes,
90}
91
92impl std::fmt::Debug for LogArray {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(f, "LogArray([{}])", self.iter().format(", "))
95 }
96}
97
98#[derive(Debug, PartialEq)]
100pub enum LogArrayError {
101 InputBufferTooSmall(usize),
102 WidthTooLarge(u8),
103 UnexpectedInputBufferSize(u64, u64, u64, u8),
104}
105
106impl LogArrayError {
107 fn validate_input_buf_size(input_buf_size: usize) -> Result<(), Self> {
111 if input_buf_size < 8 {
112 return Err(LogArrayError::InputBufferTooSmall(input_buf_size));
113 }
114 Ok(())
115 }
116
117 fn validate_len_and_width(input_buf_size: usize, len: u64, width: u8) -> Result<(), Self> {
124 if width > 64 {
125 return Err(LogArrayError::WidthTooLarge(width));
126 }
127
128 let expected_buf_size = len * u64::from(width) + 127 >> 6 << 3;
131 let input_buf_size = u64::try_from(input_buf_size).unwrap();
132
133 if input_buf_size != expected_buf_size {
134 return Err(LogArrayError::UnexpectedInputBufferSize(
135 input_buf_size,
136 expected_buf_size,
137 len,
138 width,
139 ));
140 }
141
142 Ok(())
143 }
144
145 fn validate_len_and_width_trailing(
153 input_buf_size: usize,
154 len: u64,
155 width: u8,
156 ) -> Result<(), Self> {
157 if width > 64 {
158 return Err(LogArrayError::WidthTooLarge(width));
159 }
160
161 let expected_buf_size = len * u64::from(width) + 127 >> 6 << 3;
164 let input_buf_size = u64::try_from(input_buf_size).unwrap();
165
166 if input_buf_size < expected_buf_size {
167 return Err(LogArrayError::UnexpectedInputBufferSize(
168 input_buf_size,
169 expected_buf_size,
170 len,
171 width,
172 ));
173 }
174
175 Ok(())
176 }
177}
178
179impl fmt::Display for LogArrayError {
180 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181 use LogArrayError::*;
182 match self {
183 InputBufferTooSmall(input_buf_size) => {
184 write!(f, "expected input buffer size ({}) >= 8", input_buf_size)
185 }
186 WidthTooLarge(width) => write!(f, "expected width ({}) <= 64", width),
187 UnexpectedInputBufferSize(input_buf_size, expected_buf_size, len, width) => write!(
188 f,
189 "expected input buffer size ({}) to be {} for {} elements and width {}",
190 input_buf_size, expected_buf_size, len, width
191 ),
192 }
193 }
194}
195
196impl error::Error for LogArrayError {}
197
198impl From<LogArrayError> for io::Error {
199 fn from(err: LogArrayError) -> io::Error {
200 io::Error::new(io::ErrorKind::InvalidData, err)
201 }
202}
203
204#[derive(Clone)]
205pub struct LogArrayIterator {
206 logarray: LogArray,
207 pos: usize,
208 end: usize,
209}
210
211impl Iterator for LogArrayIterator {
212 type Item = u64;
213 fn next(&mut self) -> Option<u64> {
214 if self.pos == self.end {
215 None
216 } else {
217 let result = self.logarray.entry(self.pos);
218 self.pos += 1;
219
220 Some(result)
221 }
222 }
223}
224
225const MAX_LOGARRAY_LEN: u64 = (1 << 56) - 1;
226
227pub fn parse_control_word(buf: &[u8]) -> (u64, u8) {
228 let len_1 = BigEndian::read_u32(buf) as u64;
229 let width = buf[4];
230 let len_2 = (BigEndian::read_u32(&buf[4..]) & 0xFFFFFF) as u64; let len: u64 = (len_2 << 32) + len_1;
232
233 (len, width)
234}
235
236fn read_control_word(buf: &[u8], input_buf_size: usize) -> Result<(u64, u8), LogArrayError> {
239 let (len, width) = parse_control_word(buf);
240 LogArrayError::validate_len_and_width(input_buf_size, len, width)?;
241 Ok((len, width))
242}
243
244fn read_control_word_trailing(
247 buf: &[u8],
248 input_buf_size: usize,
249) -> Result<(u64, u8), LogArrayError> {
250 let (len, width) = parse_control_word(buf);
251 LogArrayError::validate_len_and_width_trailing(input_buf_size, len, width)?;
252 Ok((len, width))
253}
254
255fn logarray_length_from_len_width(len: u64, width: u8) -> usize {
256 let num_bits = width as usize * len as usize;
257 let num_u64 = num_bits / 64 + (if num_bits % 64 == 0 { 0 } else { 1 });
258 let num_bytes = num_u64 * 8;
259
260 num_bytes
261}
262
263pub fn logarray_length_from_control_word(buf: &[u8]) -> usize {
264 let (len, width) = parse_control_word(buf);
265
266 logarray_length_from_len_width(len, width)
267}
268
269impl LogArray {
270 pub fn parse(input_buf: Bytes) -> Result<LogArray, LogArrayError> {
272 let input_buf_size = input_buf.len();
273 LogArrayError::validate_input_buf_size(input_buf_size)?;
274 let (len, width) = read_control_word(&input_buf[input_buf_size - 8..], input_buf_size)?;
275 Ok(LogArray {
276 first: 0,
277 len,
278 width,
279 input_buf,
280 })
281 }
282
283 pub fn parse_header_first(mut input_buf: Bytes) -> Result<(LogArray, Bytes), LogArrayError> {
284 let input_buf_size = input_buf.len();
285 LogArrayError::validate_input_buf_size(input_buf_size)?;
286 let (len, width) = read_control_word_trailing(&input_buf[..8], input_buf_size)?;
287 let num_bytes = logarray_length_from_len_width(len, width);
288 input_buf.advance(8);
289 let rest = input_buf.split_off(num_bytes);
290 Ok((
291 LogArray {
292 first: 0,
293 len,
294 width,
295 input_buf,
296 },
297 rest,
298 ))
299 }
300
301 pub fn len(&self) -> usize {
303 usize::try_from(self.len).unwrap()
305 }
306
307 pub fn is_empty(&self) -> bool {
309 self.len == 0
310 }
311
312 pub fn width(&self) -> u8 {
314 self.width
315 }
316
317 pub fn entry(&self, index: usize) -> u64 {
321 debug_assert!(
322 index < self.len(),
323 "expected index ({}) < length ({})",
324 index,
325 self.len
326 );
327
328 let bit_index = usize::from(self.width) * (usize::try_from(self.first).unwrap() + index);
330
331 let byte_index = bit_index >> 6 << 3;
333
334 let buf = &self.input_buf;
335
336 let first_word = BigEndian::read_u64(&buf[byte_index..]);
338
339 let leading_zeros = 64 - self.width;
341
342 let offset = (bit_index & 0b11_1111) as u8;
344
345 if offset + self.width <= 64 {
347 return first_word << offset >> leading_zeros;
349 }
350
351 let second_word = BigEndian::read_u64(&buf[byte_index + 8..]);
357
358 let first_width = 64 - offset;
360 let second_width = self.width - first_width;
361
362 let first_part = first_word << offset >> offset << second_width;
366
367 let second_part = second_word >> 64 - second_width;
369
370 first_part | second_part
372 }
373
374 pub fn iter(&self) -> LogArrayIterator {
375 LogArrayIterator {
376 logarray: self.clone(),
377 pos: 0,
378 end: self.len(),
379 }
380 }
381
382 pub fn slice(&self, offset: usize, len: usize) -> LogArray {
386 let offset = offset as u64;
387 let len = len as u64;
388 let slice_end = offset.checked_add(len).unwrap_or_else(|| {
389 panic!("overflow from slice offset ({}) + length ({})", offset, len)
390 });
391 assert!(
392 slice_end <= self.len,
393 "expected slice offset ({}) + length ({}) <= source length ({})",
394 offset,
395 len,
396 self.len
397 );
398 LogArray {
399 first: self.first + offset,
400 len,
401 width: self.width,
402 input_buf: self.input_buf.clone(),
403 }
404 }
405}
406
407pub struct LogArrayBufBuilder<B: BufMut> {
409 buf: B,
411 width: u8,
413 current: u64,
415 offset: u8,
417 count: u64,
419}
420
421impl<D: std::ops::DerefMut<Target = BytesMut> + BufMut> LogArrayBufBuilder<D> {
422 pub fn reserve(&mut self, additional: usize) {
423 self.buf.reserve(additional * self.width as usize / 8);
424 }
425}
426
427impl<B: BufMut> LogArrayBufBuilder<B> {
428 pub fn new(buf: B, width: u8) -> Self {
429 Self {
430 buf,
431 width,
432 current: 0,
434 offset: 0,
436 count: 0,
438 }
439 }
440
441 pub fn count(&self) -> u64 {
442 self.count
443 }
444
445 pub fn push(&mut self, val: u64) {
446 let leading_zeros = u64::BITS - self.width as u32;
448
449 if val.leading_zeros() < u32::from(leading_zeros) {
451 panic!("expected value ({}) to fit in {} bits", val, self.width);
452 }
453
454 self.count += 1;
457
458 self.current |= val << leading_zeros >> self.offset;
461
462 self.offset += self.width;
464
465 if self.offset >= 64 {
467 self.buf.put_u64(self.current);
470 self.offset -= 64;
472
473 self.current = if self.offset == 0 {
475 0
477 } else {
478 val << 64 - self.offset
480 };
481 }
482 }
483
484 pub fn push_vec(&mut self, vals: Vec<u64>) {
485 for val in vals {
486 self.push(val);
487 }
488 }
489
490 fn finalize_data(&mut self) {
491 if u64::from(self.count) * u64::from(self.width) & 0b11_1111 != 0 {
492 self.buf.put_u64(self.current);
493 }
494 }
495
496 pub fn finalize(mut self) -> B {
497 self.finalize_data();
498
499 self.write_control_word();
500 self.buf
501 }
502
503 pub(crate) fn finalize_without_control_word(mut self) {
504 self.finalize_data();
505 }
506
507 fn write_control_word(&mut self) {
508 let len = self.count;
509 let width = self.width;
510
511 let buf = control_word(len, width);
512 self.buf.put_slice(&buf);
513 }
514}
515
516pub(crate) fn control_word(len: u64, width: u8) -> [u8; 8] {
517 if len > MAX_LOGARRAY_LEN {
518 panic!(
519 "length is too large for control word of a logarray: {} (limit is {}",
520 len, MAX_LOGARRAY_LEN
521 );
522 }
523 let mut buf = [0; 8];
524 let len_1 = (len & 0xFFFFFFFF) as u32;
525 let len_2 = ((len >> 32) & 0xFFFFFF) as u32;
526 BigEndian::write_u32(&mut buf, len_1);
527 BigEndian::write_u32(&mut buf[4..], len_2);
528 buf[4] = width;
529
530 buf
531}
532
533pub struct LateLogArrayBufBuilder<B: BufMut> {
534 buf: B,
536 pub vals: Vec<u64>,
538 width: u8,
539}
540
541impl<B: BufMut> LateLogArrayBufBuilder<B> {
542 pub fn new(buf: B) -> Self {
543 Self {
544 buf,
545 vals: Vec::new(),
546 width: 0,
547 }
548 }
549
550 pub fn count(&self) -> u64 {
551 self.vals.len() as u64
552 }
553
554 pub fn push(&mut self, val: u64) {
555 self.vals.push(val);
556 let width = calculate_width(val);
557 if self.width < width {
558 self.width = width;
559 }
560 }
561
562 pub fn push_vec(&mut self, vals: Vec<u64>) {
563 for val in vals {
564 self.push(val)
565 }
566 }
567
568 pub fn last(&mut self) -> Option<u64> {
569 self.vals.last().copied()
570 }
571
572 pub fn pop(&mut self) -> Option<u64> {
573 self.vals.pop()
574 }
575
576 pub fn finalize(mut self) -> B {
577 let mut builder = LogArrayBufBuilder::new(&mut self.buf, self.width);
578 builder.push_vec(self.vals);
579 builder.finalize();
580 self.buf
581 }
582
583 pub fn finalize_header_first(mut self) -> B {
584 let control_word = control_word(self.count(), self.width);
585 self.buf.put(control_word.as_ref());
586 let mut builder = LogArrayBufBuilder::new(&mut self.buf, self.width);
587 builder.push_vec(self.vals);
588 builder.finalize_without_control_word();
589 self.buf
590 }
591}
592
593pub struct LogArrayFileBuilder<W: SyncableFile> {
595 file: W,
597 width: u8,
599 current: u64,
601 offset: u8,
603 count: u64,
605}
606
607impl<W: SyncableFile> LogArrayFileBuilder<W> {
608 pub fn new(w: W, width: u8) -> LogArrayFileBuilder<W> {
609 LogArrayFileBuilder {
610 file: w,
611 width,
612 current: 0,
614 offset: 0,
616 count: 0,
618 }
619 }
620
621 pub fn count(&self) -> u64 {
622 self.count
623 }
624
625 pub async fn push(&mut self, val: u64) -> io::Result<()> {
626 let leading_zeros = 64 - self.width;
628
629 if val.leading_zeros() < u32::from(leading_zeros) {
631 return Err(io::Error::new(
632 io::ErrorKind::InvalidData,
633 format!("expected value ({}) to fit in {} bits", val, self.width),
634 ));
635 }
636
637 self.count += 1;
640
641 self.current |= val << leading_zeros >> self.offset;
644
645 self.offset += self.width;
647
648 if self.offset >= 64 {
650 util::write_u64(&mut self.file, self.current).await?;
652 self.offset -= 64;
654
655 self.current = if self.offset == 0 {
657 0
659 } else {
660 val << 64 - self.offset
662 };
663 }
664
665 Ok(())
666 }
667
668 pub async fn push_vec(&mut self, vals: Vec<u64>) -> io::Result<()> {
669 for val in vals {
670 self.push(val).await?;
671 }
672
673 Ok(())
674 }
675
676 pub async fn push_all<S: Stream<Item = io::Result<u64>> + Unpin>(
677 &mut self,
678 mut vals: S,
679 ) -> io::Result<()> {
680 while let Some(val) = vals.next().await {
681 self.push(val?).await?;
682 }
683
684 Ok(())
685 }
686
687 async fn finalize_data(&mut self) -> io::Result<()> {
688 if self.count * u64::from(self.width) & 0b11_1111 != 0 {
689 util::write_u64(&mut self.file, self.current).await?;
690 }
691
692 Ok(())
693 }
694
695 pub async fn finalize(mut self) -> io::Result<()> {
696 let len = self.count;
697 let width = self.width;
698
699 self.finalize_data().await?;
701
702 let buf = control_word(len, width);
704 self.file.write_all(&buf).await?;
705
706 self.file.flush().await?;
707 self.file.sync_all().await?;
708
709 Ok(())
710 }
711}
712
713struct LogArrayDecoder {
714 current: u64,
716 width: u8,
718 offset: u8,
720 remaining: u64,
722}
723
724impl fmt::Debug for LogArrayDecoder {
725 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
726 write!(f, "LogArrayDecoder {{ current: ")?;
727 write!(f, "{:#066b}", self.current)?;
728 write!(f, ", width: ")?;
729 write!(f, "{:?}", self.width)?;
730 write!(f, ", offset: ")?;
731 write!(f, "{:?}", self.offset)?;
732 write!(f, ", remaining: ")?;
733 write!(f, "{:?}", self.remaining)?;
734 write!(f, " }}")
735 }
736}
737
738impl LogArrayDecoder {
739 fn new_unchecked(width: u8, remaining: u64) -> Self {
744 LogArrayDecoder {
745 current: 0,
747 offset: 64,
750 width,
751 remaining,
752 }
753 }
754}
755
756impl Decoder for LogArrayDecoder {
757 type Item = u64;
758 type Error = io::Error;
759
760 fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<u64>, io::Error> {
762 if self.remaining == 0 {
764 bytes.clear();
765 return Ok(None);
766 }
767
768 let first_word = self.current;
773 let offset = self.offset;
774 let width = self.width;
775
776 let leading_zeros = 64 - width;
778
779 if offset + width <= 64 {
781 self.offset += width;
783 self.remaining -= 1;
785 return Ok(Some(first_word << offset >> leading_zeros));
787 }
788
789 if bytes.len() < 8 {
794 return Ok(None);
795 }
796
797 let second_word = BigEndian::read_u64(&bytes.split_to(8));
799 self.current = second_word;
800
801 self.remaining -= 1;
803
804 if offset == 64 {
807 self.offset = width;
809
810 return Ok(Some(second_word >> leading_zeros));
813 }
814
815 let first_width = 64 - offset;
821 let second_width = width - first_width;
822
823 let first_part = first_word << offset >> offset << second_width;
827
828 let second_part = second_word >> 64 - second_width;
830
831 self.offset = second_width;
833
834 Ok(Some(first_part | second_part))
836 }
837}
838
839pub async fn logarray_file_get_length_and_width<F: FileLoad>(f: F) -> io::Result<(u64, u8)> {
840 LogArrayError::validate_input_buf_size(f.size().await?)?;
841
842 let mut buf = [0; 8];
843 f.open_read_from(f.size().await? - 8)
844 .await?
845 .read_exact(&mut buf)
846 .await?;
847 Ok(read_control_word(&buf, f.size().await?)?)
848}
849
850pub async fn logarray_stream_entries<F: 'static + FileLoad>(
851 f: F,
852) -> io::Result<impl Stream<Item = io::Result<u64>> + Unpin + Send> {
853 let (len, width) = logarray_file_get_length_and_width(f.clone()).await?;
854 Ok(FramedRead::new(
855 f.open_read().await?,
856 LogArrayDecoder::new_unchecked(width, len),
857 ))
858}
859
860#[derive(Clone)]
861pub struct MonotonicLogArray(LogArray);
862
863impl std::fmt::Debug for MonotonicLogArray {
864 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
865 write!(f, "MonotonicLogArray([{}])", self.iter().format(", "))
866 }
867}
868
869impl MonotonicLogArray {
870 pub fn from_logarray(logarray: LogArray) -> MonotonicLogArray {
871 if cfg!(debug_assertions) {
872 let mut iter = logarray.iter();
874 if let Some(mut pred) = iter.next() {
875 for succ in iter {
876 assert!(
877 pred <= succ,
878 "not monotonic: expected predecessor ({}) <= successor ({})",
879 pred,
880 succ
881 );
882 pred = succ;
883 }
884 }
885 }
886
887 MonotonicLogArray(logarray)
888 }
889
890 pub fn parse(bytes: Bytes) -> Result<MonotonicLogArray, LogArrayError> {
891 let logarray = LogArray::parse(bytes)?;
892
893 Ok(Self::from_logarray(logarray))
894 }
895
896 pub fn parse_header_first(bytes: Bytes) -> Result<(MonotonicLogArray, Bytes), LogArrayError> {
897 let (logarray, remainder) = LogArray::parse_header_first(bytes)?;
898
899 Ok((Self::from_logarray(logarray), remainder))
900 }
901
902 pub fn len(&self) -> usize {
903 self.0.len()
904 }
905
906 pub fn is_empty(&self) -> bool {
907 self.0.is_empty()
908 }
909
910 pub fn entry(&self, index: usize) -> u64 {
911 self.0.entry(index)
912 }
913
914 pub fn iter(&self) -> LogArrayIterator {
915 self.0.iter()
916 }
917
918 pub fn index_of(&self, element: u64) -> Option<usize> {
919 let index = self.nearest_index_of(element);
920 if index >= self.len() || self.entry(index) != element {
921 None
922 } else {
923 Some(index)
924 }
925 }
926
927 pub fn nearest_index_of(&self, element: u64) -> usize {
928 if self.is_empty() {
929 return 0;
930 }
931
932 let mut min = 0;
933 let mut max = self.len() - 1;
934 while min <= max {
935 let mid = (min + max) / 2;
936 match element.cmp(&self.entry(mid)) {
937 Ordering::Equal => return mid,
938 Ordering::Greater => min = mid + 1,
939 Ordering::Less => {
940 if mid == 0 {
941 return 0;
942 }
943 max = mid - 1
944 }
945 }
946 }
947
948 (min + max) / 2 + 1
949 }
950
951 pub fn slice(&self, offset: usize, len: usize) -> MonotonicLogArray {
952 Self(self.0.slice(offset, len))
953 }
954}
955
956impl From<LogArray> for MonotonicLogArray {
957 fn from(l: LogArray) -> Self {
958 Self::from_logarray(l)
959 }
960}
961
962#[cfg(test)]
963mod tests {
964 use super::*;
965 use crate::storage::memory::MemoryBackedStore;
966 use crate::storage::FileStore;
967 use crate::util::stream_iter_ok;
968 use futures::executor::block_on;
969 use futures::stream::TryStreamExt;
970
971 #[test]
972 fn log_array_error() {
973 assert_eq!(
975 "expected input buffer size (7) >= 8",
976 LogArrayError::InputBufferTooSmall(7).to_string()
977 );
978 assert_eq!(
979 "expected width (69) <= 64",
980 LogArrayError::WidthTooLarge(69).to_string()
981 );
982 assert_eq!(
983 "expected input buffer size (9) to be 8 for 0 elements and width 17",
984 LogArrayError::UnexpectedInputBufferSize(9, 8, 0, 17).to_string()
985 );
986
987 assert_eq!(
989 io::Error::new(
990 io::ErrorKind::InvalidData,
991 LogArrayError::InputBufferTooSmall(7)
992 )
993 .to_string(),
994 io::Error::from(LogArrayError::InputBufferTooSmall(7)).to_string()
995 );
996 }
997
998 #[test]
999 fn validate_input_buf_size() {
1000 let val = |buf_size| LogArrayError::validate_input_buf_size(buf_size);
1001 let err = |buf_size| Err(LogArrayError::InputBufferTooSmall(buf_size));
1002 assert_eq!(err(7), val(7));
1003 assert_eq!(Ok(()), val(8));
1004 assert_eq!(Ok(()), val(9));
1005 assert_eq!(Ok(()), val(usize::max_value()));
1006 }
1007
1008 #[test]
1009 fn validate_len_and_width() {
1010 let val =
1011 |buf_size, len, width| LogArrayError::validate_len_and_width(buf_size, len, width);
1012
1013 let err = |width| Err(LogArrayError::WidthTooLarge(width));
1014
1015 assert_eq!(err(65), val(0, 0, 65));
1017
1018 let err = |buf_size, expected, len, width| {
1019 Err(LogArrayError::UnexpectedInputBufferSize(
1020 buf_size, expected, len, width,
1021 ))
1022 };
1023
1024 assert_eq!(err(0, 8, 0, 0), val(0, 0, 0));
1026
1027 assert_eq!(Ok(()), val(8, 0, 1));
1029 assert_eq!(err(9, 8, 0, 1), val(9, 0, 1));
1030 assert_eq!(Ok(()), val(16, 1, 1));
1031
1032 assert_eq!(Ok(()), val(16, 1, 64));
1034 assert_eq!(err(16, 24, 2, 64), val(16, 2, 64));
1035 assert_eq!(err(24, 16, 1, 64), val(24, 1, 64));
1036
1037 #[cfg(target_pointer_width = "64")]
1038 assert_eq!(
1039 Ok(()),
1040 val(
1041 usize::try_from(u64::from(u32::max_value()) + 1 << 3).unwrap(),
1042 u32::max_value() as u64,
1043 64
1044 )
1045 );
1046
1047 assert_eq!(err(16, 24, 13, 5), val(16, 13, 5));
1049 assert_eq!(Ok(()), val(24, 13, 5));
1050 }
1051
1052 #[test]
1053 pub fn empty() {
1054 let logarray = LogArray::parse(Bytes::from([0u8; 8].as_ref())).unwrap();
1055 assert!(logarray.is_empty());
1056 assert!(MonotonicLogArray::from_logarray(logarray).is_empty());
1057 }
1058
1059 #[test]
1060 pub fn late_logarray_just_zero() {
1061 let buf = BytesMut::new();
1062 let mut builder = LateLogArrayBufBuilder::new(buf);
1063 builder.push(0);
1064 let logarray_buf = builder.finalize().freeze();
1065 let logarray = LogArray::parse(logarray_buf).unwrap();
1066 assert_eq!(logarray.entry(0_usize), 0_u64);
1067 }
1068
1069 #[tokio::test]
1070 #[should_panic(expected = "expected value (8) to fit in 3 bits")]
1071 async fn log_array_file_builder_panic() {
1072 let store = MemoryBackedStore::new();
1073 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 3);
1074 block_on(builder.push(8)).unwrap();
1075 }
1076
1077 #[tokio::test]
1078 async fn generate_then_parse_works() {
1079 let store = MemoryBackedStore::new();
1080 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1081 block_on(async {
1082 builder
1083 .push_all(stream_iter_ok(vec![1, 3, 2, 5, 12, 31, 18]))
1084 .await?;
1085 builder.finalize().await?;
1086
1087 Ok::<_, io::Error>(())
1088 })
1089 .unwrap();
1090
1091 let content = block_on(store.map()).unwrap();
1092
1093 let logarray = LogArray::parse(content).unwrap();
1094
1095 assert_eq!(1, logarray.entry(0));
1096 assert_eq!(3, logarray.entry(1));
1097 assert_eq!(2, logarray.entry(2));
1098 assert_eq!(5, logarray.entry(3));
1099 assert_eq!(12, logarray.entry(4));
1100 assert_eq!(31, logarray.entry(5));
1101 assert_eq!(18, logarray.entry(6));
1102 }
1103
1104 const TEST0_DATA: [u8; 8] = [
1105 0b00000000,
1106 0b00000000,
1107 0b1_0000000,
1108 0b00000000,
1109 0b10_000000,
1110 0b00000000,
1111 0b011_00000,
1112 0b00000000,
1113 ];
1114 const TEST0_CONTROL: [u8; 8] = [0, 0, 0, 3, 17, 0, 0, 0];
1115 const TEST1_DATA: [u8; 8] = [
1116 0b0100_0000,
1117 0b00000000,
1118 0b00101_000,
1119 0b00000000,
1120 0b000110_00,
1121 0b00000000,
1122 0b0000111_0,
1123 0b00000000,
1124 ];
1125
1126 fn test0_logarray() -> LogArray {
1127 let mut content = Vec::new();
1128 content.extend_from_slice(&TEST0_DATA);
1129 content.extend_from_slice(&TEST0_CONTROL);
1130 LogArray::parse(Bytes::from(content)).unwrap()
1131 }
1132
1133 #[test]
1134 #[should_panic(expected = "expected index (3) < length (3)")]
1135 fn entry_panic() {
1136 let _ = test0_logarray().entry(3);
1137 }
1138
1139 #[test]
1140 #[should_panic(expected = "expected slice offset (2) + length (2) <= source length (3)")]
1141 fn slice_panic1() {
1142 let _ = test0_logarray().slice(2, 2);
1143 }
1144
1145 #[test]
1146 #[should_panic(expected = "expected slice offset (4294967296)")]
1147 #[cfg(target_pointer_width = "64")]
1148 fn slice_panic2() {
1149 let _ = test0_logarray().slice(usize::try_from(u32::max_value()).unwrap() + 1, 2);
1150 }
1151
1152 #[test]
1153 #[should_panic(expected = "expected index (2) < length (2)")]
1154 fn slice_entry_panic() {
1155 let _ = test0_logarray().slice(1, 2).entry(2);
1156 }
1157
1158 #[test]
1159 #[cfg(debug_assertions)]
1160 #[should_panic(expected = "not monotonic: expected predecessor (2) <= successor (1)")]
1161 fn monotonic_panic() {
1162 let content = [0u8, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 32, 0, 0, 0].as_ref();
1163 MonotonicLogArray::from_logarray(LogArray::parse(Bytes::from(content)).unwrap());
1164 }
1165
1166 #[test]
1167 fn decode() {
1168 let mut decoder = LogArrayDecoder::new_unchecked(17, 1);
1169 let mut bytes = BytesMut::from(TEST0_DATA.as_ref());
1170 assert_eq!(Some(1), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1171 assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
1172 decoder = LogArrayDecoder::new_unchecked(17, 4);
1173 bytes = BytesMut::from(TEST0_DATA.as_ref());
1174 assert_eq!(Some(1), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1175 assert_eq!(
1176 "LogArrayDecoder { current: \
1177 0b0000000000000000100000000000000010000000000000000110000000000000, width: 17, \
1178 offset: 17, remaining: 3 }",
1179 format!("{:?}", decoder)
1180 );
1181 assert_eq!(Some(2), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1182 assert_eq!(Some(3), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1183 assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
1184 bytes.extend(TEST1_DATA.iter());
1185 assert_eq!(Some(4), Decoder::decode(&mut decoder, &mut bytes).unwrap());
1186 assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
1187 }
1188
1189 #[tokio::test]
1190 async fn logarray_file_get_length_and_width_errors() {
1191 let store = MemoryBackedStore::new();
1192 let mut writer = store.open_write().await.unwrap();
1193 writer.write_all(&[0, 0, 0]).await.unwrap();
1194 writer.sync_all().await.unwrap();
1195 assert_eq!(
1196 io::Error::from(LogArrayError::InputBufferTooSmall(3)).to_string(),
1197 block_on(logarray_file_get_length_and_width(store))
1198 .err()
1199 .unwrap()
1200 .to_string()
1201 );
1202
1203 let store = MemoryBackedStore::new();
1204 let mut writer = store.open_write().await.unwrap();
1205 writer.write_all(&[0, 0, 0, 0, 65, 0, 0, 0]).await.unwrap();
1206 writer.sync_all().await.unwrap();
1207 assert_eq!(
1208 io::Error::from(LogArrayError::WidthTooLarge(65)).to_string(),
1209 block_on(logarray_file_get_length_and_width(store))
1210 .err()
1211 .unwrap()
1212 .to_string()
1213 );
1214
1215 let store = MemoryBackedStore::new();
1216 let mut writer = store.open_write().await.unwrap();
1217 writer.write_all(&[0, 0, 0, 1, 17, 0, 0, 0]).await.unwrap();
1218 writer.sync_all().await.unwrap();
1219 assert_eq!(
1220 io::Error::from(LogArrayError::UnexpectedInputBufferSize(8, 16, 1, 17)).to_string(),
1221 block_on(logarray_file_get_length_and_width(store))
1222 .err()
1223 .unwrap()
1224 .to_string()
1225 );
1226 }
1227
1228 #[tokio::test]
1229 async fn generate_then_stream_works() {
1230 let store = MemoryBackedStore::new();
1231 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1232 block_on(async {
1233 builder.push_all(stream_iter_ok(0..31)).await?;
1234 builder.finalize().await?;
1235
1236 Ok::<_, io::Error>(())
1237 })
1238 .unwrap();
1239
1240 let entries: Vec<u64> = block_on(
1241 logarray_stream_entries(store)
1242 .await
1243 .unwrap()
1244 .try_collect::<Vec<u64>>(),
1245 )
1246 .unwrap();
1247 let expected: Vec<u64> = (0..31).collect();
1248 assert_eq!(expected, entries);
1249 }
1250
1251 #[tokio::test]
1252 async fn iterate_over_logarray() {
1253 let store = MemoryBackedStore::new();
1254 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1255 let original = vec![1, 3, 2, 5, 12, 31, 18];
1256 block_on(async {
1257 builder.push_all(stream_iter_ok(original.clone())).await?;
1258 builder.finalize().await?;
1259
1260 Ok::<_, io::Error>(())
1261 })
1262 .unwrap();
1263
1264 let content = block_on(store.map()).unwrap();
1265
1266 let logarray = LogArray::parse(content).unwrap();
1267
1268 let result: Vec<u64> = logarray.iter().collect();
1269
1270 assert_eq!(original, result);
1271 }
1272
1273 #[tokio::test]
1274 async fn iterate_over_logarray_slice() {
1275 let store = MemoryBackedStore::new();
1276 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1277 let original: Vec<u64> = vec![1, 3, 2, 5, 12, 31, 18];
1278 block_on(async {
1279 builder.push_all(stream_iter_ok(original)).await?;
1280 builder.finalize().await?;
1281
1282 Ok::<_, io::Error>(())
1283 })
1284 .unwrap();
1285
1286 let content = block_on(store.map()).unwrap();
1287
1288 let logarray = LogArray::parse(content).unwrap();
1289 let slice = logarray.slice(2, 3);
1290
1291 let result: Vec<u64> = slice.iter().collect();
1292
1293 assert_eq!([2, 5, 12], result.as_ref());
1294 }
1295
1296 #[tokio::test]
1297 async fn monotonic_logarray_index_lookup() {
1298 let store = MemoryBackedStore::new();
1299 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1300 let original = vec![1, 3, 5, 6, 7, 10, 11, 15, 16, 18, 20, 25, 31];
1301 block_on(async {
1302 builder.push_all(stream_iter_ok(original.clone())).await?;
1303 builder.finalize().await?;
1304
1305 Ok::<_, io::Error>(())
1306 })
1307 .unwrap();
1308
1309 let content = block_on(store.map()).unwrap();
1310
1311 let logarray = LogArray::parse(content).unwrap();
1312 let monotonic = MonotonicLogArray::from_logarray(logarray);
1313
1314 for (i, &val) in original.iter().enumerate() {
1315 assert_eq!(i, monotonic.index_of(val).unwrap());
1316 }
1317
1318 assert_eq!(None, monotonic.index_of(12));
1319 assert_eq!(original.len(), monotonic.len());
1320 }
1321
1322 #[tokio::test]
1323 async fn monotonic_logarray_near_index_lookup() {
1324 let store = MemoryBackedStore::new();
1325 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 5);
1326 let original = vec![3, 5, 6, 7, 10, 11, 15, 16, 18, 20, 25, 31];
1327 block_on(async {
1328 builder.push_all(stream_iter_ok(original.clone())).await?;
1329 builder.finalize().await?;
1330 Ok::<_, io::Error>(())
1331 })
1332 .unwrap();
1333
1334 let content = block_on(store.map()).unwrap();
1335
1336 let logarray = LogArray::parse(content).unwrap();
1337 let monotonic = MonotonicLogArray::from_logarray(logarray);
1338
1339 for (i, &val) in original.iter().enumerate() {
1340 assert_eq!(i, monotonic.index_of(val).unwrap());
1341 }
1342
1343 let nearest: Vec<_> = (1..=32).map(|i| monotonic.nearest_index_of(i)).collect();
1344 let expected = vec![
1345 0, 0, 0, 1, 1, 2, 3, 4, 4, 4, 5, 6, 6, 6, 6, 7, 8, 8, 9, 9, 10, 10, 10, 10, 10, 11, 11,
1346 11, 11, 11, 11, 12,
1347 ];
1348 assert_eq!(expected, nearest);
1349 }
1350
1351 #[tokio::test]
1352 async fn writing_64_bits_of_data() {
1353 let store = MemoryBackedStore::new();
1354 let original = vec![1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8];
1355 let mut builder = LogArrayFileBuilder::new(store.open_write().await.unwrap(), 4);
1356 block_on(async {
1357 builder.push_all(stream_iter_ok(original.clone())).await?;
1358 builder.finalize().await?;
1359
1360 Ok::<_, io::Error>(())
1361 })
1362 .unwrap();
1363
1364 let content = block_on(store.map()).unwrap();
1365 let logarray = LogArray::parse(content).unwrap();
1366 assert_eq!(original, logarray.iter().collect::<Vec<_>>());
1367 assert_eq!(16, logarray.len());
1368 assert_eq!(4, logarray.width());
1369 }
1370
1371 #[test]
1372 fn large_control_word() {
1373 let num: u64 = 0xFF_FFFF_FFFF_FFFF;
1374 let width: u8 = 32;
1375
1376 let control_word = control_word(num, width);
1377 assert_eq!([255, 255, 255, 255, 32, 255, 255, 255], control_word);
1378 let (out_num, out_width) = parse_control_word(&control_word);
1379 assert_eq!(num, out_num);
1380 assert_eq!(width, out_width);
1381 }
1382}