use std::collections::VecDeque;
use serde::Deserialize;
use serde::Serialize;
use crate::compression::compress;
use crate::compression::decompress;
use crate::ChunkSize;
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize)]
pub struct Deque<T> {
uncompressed_buffer_front: VecDeque<T>,
uncompressed_buffer_back: VecDeque<T>,
compressed_storage: VecDeque<Vec<u8>>,
chunk_size: usize,
compression_level: i32,
}
impl<T> Deque<T> {
pub fn new() -> Deque<T> {
Deque::new_with_options(ChunkSize::Default, 0)
}
pub fn new_with_options(chunksize: ChunkSize, compression_level: i32) -> Deque<T> {
let elementsize = std::mem::size_of::<T>();
let chunk_size = match chunksize {
ChunkSize::SizeElements(x) => x,
ChunkSize::SizeBytes(x) => x / elementsize,
ChunkSize::SizeMB(x) => x * 1024 * 1024 / elementsize,
ChunkSize::Default => 10 * 1024 * 1024 / elementsize,
};
let uncompressed_buffer_front = VecDeque::new();
let uncompressed_buffer_back = VecDeque::new();
let compressed_storage = VecDeque::new();
Deque {
uncompressed_buffer_front,
uncompressed_buffer_back,
compressed_storage,
chunk_size,
compression_level,
}
}
pub fn push_back(&mut self, value: T)
where
T: Serialize,
{
self.uncompressed_buffer_back.push_back(value);
if self.uncompressed_buffer_back.len() >= self.chunk_size {
let compressed = compress(&self.uncompressed_buffer_back, self.compression_level);
self.compressed_storage.push_back(compressed);
self.uncompressed_buffer_back.clear();
}
}
pub fn push_front(&mut self, value: T)
where
T: Serialize,
{
self.uncompressed_buffer_front.push_front(value);
if self.uncompressed_buffer_front.len() >= self.chunk_size {
let compressed = compress(&self.uncompressed_buffer_front, self.compression_level);
self.compressed_storage.push_front(compressed);
self.uncompressed_buffer_front.clear();
}
}
pub fn pop_back(&mut self) -> Option<T>
where
T: for<'a> Deserialize<'a>,
{
if self.uncompressed_buffer_back.is_empty() {
if let Some(x) = self.compressed_storage.pop_back() {
self.uncompressed_buffer_back = decompress(&x);
} else {
self.uncompressed_buffer_back = std::mem::take(&mut self.uncompressed_buffer_front);
}
}
self.uncompressed_buffer_back.pop_back()
}
pub fn pop_front(&mut self) -> Option<T>
where
T: for<'a> Deserialize<'a>,
{
if self.uncompressed_buffer_front.is_empty() {
if let Some(x) = self.compressed_storage.pop_front() {
self.uncompressed_buffer_front = decompress(&x);
} else {
self.uncompressed_buffer_front = std::mem::take(&mut self.uncompressed_buffer_back);
}
}
self.uncompressed_buffer_front.pop_front()
}
}
impl<T> Default for Deque<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Iterator for Deque<T>
where
T: Serialize + for<'a> Deserialize<'a>,
{
type Item = T;
fn next(&mut self) -> Option<Self::Item>
where
T: Serialize + for<'a> Deserialize<'a>,
{
self.pop_front()
}
}
impl<T> FromIterator<T> for Deque<T>
where
T: Serialize + for<'a> Deserialize<'a>,
{
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self
where
T: Serialize + for<'a> Deserialize<'a>,
{
let mut c = Deque::new();
for i in iter {
c.push_back(i);
}
c
}
}
#[cfg(test)]
mod tests {
use crate::*;
#[test]
fn simple_test() {
let mut big_vecdeque = std::collections::VecDeque::new();
let mut compressed_deque = Deque::new_with_options(ChunkSize::SizeElements(1024 * 9), 0);
for _ in 0..(1024 * 10) {
big_vecdeque.push_back(1);
compressed_deque.push_back(1);
}
loop {
let a = big_vecdeque.pop_front();
let b = compressed_deque.pop_front();
assert!(a == b);
if a.is_none() | b.is_none() {
break;
}
}
}
#[test]
fn iter_test() {
let mut big_vecdeque = Vec::new();
let mut compressed_deque = Stack::new_with_options(ChunkSize::SizeElements(1024 * 9), 0);
for _ in 0..(1024 * 10) {
big_vecdeque.push(1.0);
compressed_deque.push(1.0);
}
let mut big_vecdeque_it = big_vecdeque.into_iter();
let mut compressed_deque_it = compressed_deque.into_iter();
loop {
let a = big_vecdeque_it.next();
let b = compressed_deque_it.next();
assert!(a == b);
if a.is_none() | b.is_none() {
break;
}
}
}
}