use std::borrow::BorrowMut;
use std::io::{Cursor, Read};
use std::sync::Arc;
use crate::encoding::decoder::{Decoder, Field, IntPlainDecoder, LongBinaryDecoder};
use byteorder::ReadBytesExt;
use varint::VarintRead;
use crate::chunk;
use crate::encoding::decoder;
use crate::file::compress::Snappy;
use crate::file::metadata::{ChunkMetadata, TSDataType, TimeseriesMetadata};
use crate::file::reader::{ChunkReader, PageReader, SectionReader, SensorReader};
use crate::file::statistics::{
BinaryStatistics, BooleanStatistics, DoubleStatistics, FloatStatistics, IntegerStatistics,
LongStatistics, Statistic,
};
use crate::file::{compress, statistics};
use crate::utils::cursor;
use crate::utils::cursor::VarIntReader;
use snafu::{ensure, ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to decompress chunk data: {}", source))]
DecompressChunkData { source: compress::Error },
#[snafu(display("Unable to decompress chunk data: {}", source))]
DecodePageData { source: decoder::Error },
#[snafu(display("Unable to get chunk reader i:{}, max length:{}", i, len))]
GetChunkReaderI { i: usize, len: usize },
#[snafu(display("Unable to get cursor from reader, {}", source))]
GetCursor { source: crate::file::reader::Error },
#[snafu(display("Unable to read VarInt string, {}", source))]
ReadVarIntString { source: cursor::Error },
#[snafu(display("Unable to read cursor data, {}", source))]
ReadCursorData { source: std::io::Error },
#[snafu(display("Unable to read {} type statistics, {}", s_type, source))]
ReadStatistics {
s_type: String,
source: statistics::Error,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct TsFileSensorReader<R: SectionReader> {
reader: Arc<R>,
meta: Vec<ChunkMetadata>,
}
impl<R: SectionReader> TsFileSensorReader<R> {
pub fn new(reader: Arc<R>, meta: Vec<TimeseriesMetadata>) -> Self {
let chunks: Vec<ChunkMetadata> = meta
.into_iter()
.flat_map(|x| x.chunk_metadata_list())
.collect();
Self {
reader,
meta: chunks,
}
}
}
impl<R: 'static + SectionReader> SensorReader for TsFileSensorReader<R> {
fn metadata(&self) -> &Vec<ChunkMetadata> {
&self.meta
}
fn number_of_chunks(&self) -> usize {
self.meta.len()
}
fn get_chunk_reader(
&self,
i: usize,
) -> std::result::Result<Box<dyn ChunkReader<Item = Box<dyn PageReader>>>, chunk::reader::Error>
{
let chunk_meta = &self.meta.get(i);
ensure!(
chunk_meta.is_some(),
GetChunkReaderI {
i,
len: self.meta.len()
}
);
let chunk = chunk_meta.unwrap();
let offset = chunk.offset_chunk_header();
let mut header_reader = self
.reader
.get_cursor(offset as u64, 1 * 1024)
.context(GetCursor {})?;
let chunk_header = ChunkHeader::try_from(header_reader.borrow_mut())?;
let first_page = header_reader.position() + offset as u64;
Ok(Box::new(DefaultChunkReader::new(
self.reader
.get_cursor(first_page, chunk_header.data_size as usize)
.context(GetCursor {})?,
chunk_header,
chunk.statistic(),
)?))
}
}
pub struct DefaultChunkReader {
cursor: Cursor<Vec<u8>>,
header: ChunkHeader,
pages: Vec<Box<dyn PageReader>>,
statistic: Arc<Statistic>,
}
impl DefaultChunkReader {
pub fn new(
mut cursor: Cursor<Vec<u8>>,
header: ChunkHeader,
statistic: Arc<Statistic>,
) -> Result<Self> {
let mut pages: Vec<Box<dyn PageReader>> = Vec::new();
while cursor.position() < header.data_size as u64 {
match header.chunk_type {
5 => {
let uncompressed_size =
cursor.read_unsigned_varint_32().context(ReadCursorData)?;
let compressed_size =
cursor.read_unsigned_varint_32().context(ReadCursorData)?;
let mut data = vec![0; compressed_size as usize];
cursor.read_exact(&mut data).context(ReadCursorData)?;
pages.push(Box::new(DefaultPageReader {
header: PageHeader::new(
uncompressed_size,
compressed_size,
statistic.clone(),
),
value_decoder: match header.data_type {
TSDataType::Int32 => Box::new(IntPlainDecoder::new()),
_ => Box::new(IntPlainDecoder::new()),
},
data: Cursor::new(data),
}));
}
_ => {
let uncompressed_size =
cursor.read_unsigned_varint_32().context(ReadCursorData)?;
let compressed_size =
cursor.read_unsigned_varint_32().context(ReadCursorData)?;
let page_statistic = Arc::new(match *statistic {
Statistic::Boolean(_) => Statistic::Boolean(
BooleanStatistics::try_from(cursor.borrow_mut()).context(
ReadStatistics {
s_type: "boolean".to_string(),
},
)?,
),
Statistic::Int32(_) => Statistic::Int32(
IntegerStatistics::try_from(cursor.borrow_mut()).context(
ReadStatistics {
s_type: "Int32".to_string(),
},
)?,
),
Statistic::Int64(_) => Statistic::Int64(
LongStatistics::try_from(cursor.borrow_mut()).context(
ReadStatistics {
s_type: "Int64".to_string(),
},
)?,
),
Statistic::FLOAT(_) => Statistic::FLOAT(
FloatStatistics::try_from(cursor.borrow_mut()).context(
ReadStatistics {
s_type: "FLOAT".to_string(),
},
)?,
),
Statistic::DOUBLE(_) => Statistic::DOUBLE(
DoubleStatistics::try_from(cursor.borrow_mut()).context(
ReadStatistics {
s_type: "DOUBLE".to_string(),
},
)?,
),
Statistic::TEXT(_) => Statistic::TEXT(
BinaryStatistics::try_from(cursor.borrow_mut()).context(
ReadStatistics {
s_type: "TEXT".to_string(),
},
)?,
),
});
let mut data = vec![0; compressed_size as usize];
cursor.read_exact(&mut data).context(ReadCursorData)?;
pages.push(Box::new(DefaultPageReader {
header: PageHeader::new(uncompressed_size, compressed_size, page_statistic),
value_decoder: match header.data_type {
TSDataType::Int32 => Box::new(IntPlainDecoder::new()),
_ => Box::new(IntPlainDecoder::new()),
},
data: Cursor::new(data),
}));
}
}
}
Ok(Self {
cursor,
header,
pages,
statistic,
})
}
}
impl Iterator for DefaultChunkReader {
type Item = Box<dyn PageReader>;
fn next(&mut self) -> Option<Self::Item> {
if self.pages.is_empty() {
return None;
}
self.pages.pop()
}
}
impl ChunkReader for DefaultChunkReader {}
impl PageReader for DefaultPageReader {
fn header(&self) -> &PageHeader {
&self.header
}
fn data(&self) -> Result<(Vec<Field>, Vec<Field>)> {
let mut data = Cursor::new(self.data.un_compress().context(DecompressChunkData)?);
let time_len = data.read_unsigned_varint_32().expect("123");
let mut time_data: Vec<u8> = vec![0; time_len as usize];
data.read_exact(&mut time_data);
let time = LongBinaryDecoder::new()
.decode(&mut Cursor::new(time_data))
.expect("123");
let data = self
.value_decoder
.decode(&mut data)
.context(DecodePageData)?;
Ok((time, data))
}
}
pub struct DefaultPageReader {
header: PageHeader,
value_decoder: Box<dyn Decoder>,
data: Cursor<Vec<u8>>,
}
#[derive(Debug)]
pub struct PageHeader {
uncompressed_size: u32,
compressed_size: u32,
statistics: Arc<Statistic>,
}
impl PageHeader {
pub fn new(uncompressed_size: u32, compressed_size: u32, statistics: Arc<Statistic>) -> Self {
Self {
uncompressed_size,
compressed_size,
statistics,
}
}
}
pub struct ChunkHeader {
chunk_type: u8,
measurement_id: String,
data_size: u32,
data_type: TSDataType,
compression_type: CompressionType,
encoding_type: TSEncoding,
}
impl TryFrom<&mut Cursor<Vec<u8>>> for ChunkHeader {
type Error = Error;
fn try_from(cursor: &mut Cursor<Vec<u8>>) -> std::result::Result<Self, Self::Error> {
let chunk_type = cursor.read_u8().context(ReadCursorData)?;
let measurement_id = cursor.read_varint_string().context(ReadVarIntString)?;
let data_size = cursor.read_unsigned_varint_32().context(ReadCursorData)?;
let data_type = TSDataType::new(cursor.read_u8().context(ReadCursorData)?);
let compression_type = CompressionType::new(cursor.read_u8().context(ReadCursorData)?);
let encoding_type = TSEncoding::new(cursor.read_u8().context(ReadCursorData)?);
Ok(Self {
chunk_type,
measurement_id,
data_size,
data_type,
compression_type,
encoding_type,
})
}
}
pub enum CompressionType {
Uncompressed,
Snappy,
Gzip,
Lzo,
Sdt,
Paa,
Pla,
LZ4,
}
impl CompressionType {
pub fn new(id: u8) -> Self {
match id {
0 => Self::Uncompressed,
1 => Self::Snappy,
2 => Self::Gzip,
3 => Self::Lzo,
4 => Self::Sdt,
5 => Self::Paa,
6 => Self::Pla,
_ => Self::LZ4,
}
}
}
pub enum TSEncoding {
Plain,
PlainDictionary,
Rle,
Diff,
Ts2diff,
Bitmap,
GorillaV1,
Regular,
Gorilla,
}
impl TSEncoding {
pub fn new(id: u8) -> Self {
match id {
0 => Self::Plain,
1 => Self::PlainDictionary,
2 => Self::Rle,
3 => Self::Diff,
4 => Self::Ts2diff,
5 => Self::Bitmap,
6 => Self::GorillaV1,
7 => Self::Regular,
_ => Self::Gorilla,
}
}
}