use crate::writer::encoding::time_encoder::LongTs2DiffEncoder;
use crate::writer::encoding::Encoder;
use crate::writer::statistics::Statistics;
use crate::writer::tsfile_io_writer::TsFileIoWriter;
use crate::writer::utils::{size_var_i32, size_var_u32};
use crate::writer::{
utils, write_str, CompressionType, IoTDBValue, PositionedWrite, Serializable, TSDataType,
TSEncoding, TsFileError, CHUNK_HEADER, ONLY_ONE_PAGE_CHUNK_HEADER,
};
use snap::raw::max_compress_len;
use std::fmt::{Display, Formatter};
use std::io;
use std::io::Write;
const MAX_NUMBER_OF_POINTS_IN_PAGE: u32 = 1048576;
const VALUE_COUNT_IN_ONE_PAGE_FOR_NEXT_CHECK: u32 = 7989;
const PAGE_SIZE_THRESHOLD: u32 = 65536;
const MINIMUM_RECORD_COUNT_FOR_CHECK: u32 = 1500;
struct PageWriter {
time_encoder: LongTs2DiffEncoder,
value_encoder: Box<dyn Encoder>,
data_type: TSDataType,
statistics: Statistics,
point_number: u32,
buffer: Vec<u8>,
}
impl PageWriter {
fn new(data_type: TSDataType, encoding: TSEncoding) -> Result<PageWriter, TsFileError> {
Ok(PageWriter {
time_encoder: LongTs2DiffEncoder::new(),
value_encoder: <dyn Encoder>::new(data_type, encoding)?,
data_type,
statistics: Statistics::new(data_type),
buffer: Vec::with_capacity(65536),
point_number: 0,
})
}
pub(crate) fn reset(&mut self) {
self.statistics = Statistics::new(self.data_type);
self.time_encoder.reset();
self.value_encoder.reset();
self.point_number = 0;
}
pub(crate) fn estimate_max_mem_size(&mut self) -> u32 {
let time_encoder_size = self.time_encoder.size();
let value_encoder_size = self.value_encoder.size();
let time_encoder_max_size = self.time_encoder.get_max_byte_size();
let value_encoder_max_size = self.value_encoder.get_max_byte_size();
let max_size =
time_encoder_size + value_encoder_size + time_encoder_max_size + value_encoder_max_size;
log::trace!("Max size estimated for page writer: {}", max_size);
max_size
}
fn write(&mut self, timestamp: i64, value: &mut IoTDBValue) -> Result<u32, &str> {
self.time_encoder.write(×tamp.into());
self.value_encoder.write(value);
self.statistics.update(timestamp, value);
self.point_number += 1;
Ok(1)
}
pub(crate) fn prepare_buffer(&mut self) {
self.buffer.clear();
let mut time_buffer = vec![];
self.time_encoder.serialize(&mut time_buffer);
crate::writer::write_var_u32(time_buffer.len() as u32, &mut self.buffer);
self.buffer.write_all(time_buffer.as_slice());
self.value_encoder.serialize(&mut self.buffer);
}
}
pub struct ChunkWriter {
pub(crate) measurement_id: String,
pub(crate) data_type: TSDataType,
pub compression_type: CompressionType,
pub encoding: TSEncoding,
#[allow(dead_code)]
pub(crate) mask: u8,
#[allow(dead_code)]
offset_of_chunk_header: Option<u64>,
pub(crate) statistics: Statistics,
current_page_writer: Option<PageWriter>,
page_buffer: Vec<u8>,
num_pages: u32,
first_page_statistics: Option<Statistics>,
value_count_in_one_page_for_next_check: u32,
size_without_statistics: usize,
}
impl ChunkWriter {
pub fn new(
measurement_id: &str,
data_type: TSDataType,
compression_type: CompressionType,
encoding: TSEncoding,
) -> ChunkWriter {
ChunkWriter {
measurement_id: measurement_id.to_owned(),
data_type,
compression_type,
encoding,
mask: 0,
offset_of_chunk_header: None,
statistics: Statistics::new(data_type),
current_page_writer: None,
page_buffer: vec![],
num_pages: 0,
first_page_statistics: None,
value_count_in_one_page_for_next_check: VALUE_COUNT_IN_ONE_PAGE_FOR_NEXT_CHECK,
size_without_statistics: 0,
}
}
#[allow(dead_code)]
pub(crate) fn get_metadata(&self) -> ChunkMetadata {
ChunkMetadata {
measurement_id: self.measurement_id.clone(),
data_type: self.data_type,
mask: 0,
offset_of_chunk_header: match self.offset_of_chunk_header {
None => {
panic!("get_metadata called before offset is defined");
}
Some(offset) => offset,
} as i64,
statistics: self.statistics.clone(),
}
}
pub(crate) fn seal_current_page(&mut self) {
match &self.current_page_writer {
None => {}
Some(pw) => {
if pw.point_number > 0 {
self.write_page_to_buffer();
}
}
}
}
pub(crate) fn get_serialized_chunk_size(&self) -> u64 {
if self.page_buffer.is_empty() {
0
} else {
let measurement_length = self.measurement_id.len() as i32;
1_u64 + size_var_i32(measurement_length) as u64 + measurement_length as u64 + size_var_u32(self.page_buffer.len() as u32) as u64 + 1_u64 + 1_u64 + 1_u64 + self.page_buffer.len() as u64
}
}
pub(crate) fn write_to_file_writer<T: PositionedWrite>(
&mut self,
file_writer: &mut TsFileIoWriter<T>,
) {
self.seal_current_page();
self.write_all_pages_of_chunk_to_ts_file(file_writer, &self.statistics);
self.page_buffer.clear();
self.num_pages = 0;
self.first_page_statistics = None;
self.statistics = Statistics::new(self.data_type);
}
fn write_all_pages_of_chunk_to_ts_file<T: PositionedWrite>(
&self,
file_writer: &mut TsFileIoWriter<T>,
statistics: &Statistics,
) {
if statistics.count() == 0 {
return;
}
file_writer.start_flush_chunk(
self.measurement_id.clone(),
self.compression_type,
self.data_type,
self.encoding,
statistics.clone(),
self.page_buffer.len() as u32,
self.num_pages,
0,
);
let data_offset = file_writer.out.get_position();
log::trace!("Dumping pages at offset {}", data_offset);
file_writer.out.write_all(&self.page_buffer);
log::trace!("Offset after {}", file_writer.out.get_position());
file_writer.end_current_chunk();
}
pub(crate) fn estimate_max_series_mem_size(&mut self) -> u32 {
match &mut self.current_page_writer {
Some(pw) => {
let pw_mem_size = pw.estimate_max_mem_size();
let stat_size = 2 * (4 + 1) + pw.statistics.get_serialized_size();
let size = self.page_buffer.len() as u32 +
pw_mem_size +
stat_size;
log::trace!("Estimated max series mem size: {}", size);
size
}
None => 0,
}
}
pub fn write(&mut self, timestamp: i64, mut value: IoTDBValue) -> Result<u32, TsFileError> {
match &mut self.current_page_writer {
None => {
self.current_page_writer = Some(PageWriter::new(self.data_type, self.encoding)?)
}
Some(_) => {
}
}
let records_written = match &mut self.current_page_writer {
None => {
panic!("Something bad happened!");
}
Some(page_writer) => page_writer.write(timestamp, &mut value).unwrap(),
};
self.check_page_size_and_may_open_new_page();
Ok(records_written)
}
fn check_page_size_and_may_open_new_page(&mut self) {
if self.current_page_writer.is_none() {
return;
}
let page_writer = self.current_page_writer.as_mut().unwrap();
if page_writer.point_number > MAX_NUMBER_OF_POINTS_IN_PAGE {
self.write_page_to_buffer();
} else if page_writer.point_number >= self.value_count_in_one_page_for_next_check {
let current_page_size = page_writer.estimate_max_mem_size();
if current_page_size > PAGE_SIZE_THRESHOLD {
log::trace!(
"enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
self.measurement_id.clone(),
PAGE_SIZE_THRESHOLD,
current_page_size,
page_writer.point_number);
self.write_page_to_buffer();
self.value_count_in_one_page_for_next_check = MINIMUM_RECORD_COUNT_FOR_CHECK;
} else {
self.value_count_in_one_page_for_next_check =
((PAGE_SIZE_THRESHOLD as f32) / (current_page_size as f32)
* (page_writer.point_number as f32)) as u32;
}
}
}
fn write_page_to_buffer(&mut self) -> Result<(), TsFileError> {
if let Some(page_writer) = self.current_page_writer.as_mut() {
page_writer.prepare_buffer();
let buffer_size: u32 = page_writer.buffer.len() as u32;
let uncompressed_bytes = buffer_size;
let mut compressed_buffer: Vec<u8> = Vec::new();
let compressed_bytes = match self.compression_type {
CompressionType::UNCOMPRESSED => uncompressed_bytes,
CompressionType::SNAPPY => {
let mut encoder1 = snap::raw::Encoder::new();
let out_buffer_len = max_compress_len(page_writer.buffer.len());
let mut out = vec![0; out_buffer_len];
match encoder1.compress(&page_writer.buffer, out.as_mut_slice()) {
Ok(size) => {
let mut reader = &out.as_mut_slice()[..size];
io::copy(&mut reader, &mut compressed_buffer);
size as u32
}
Err(_) => {
return Err(TsFileError::Compression);
}
}
}
};
if self.num_pages == 0 {
self.size_without_statistics +=
utils::write_var_u32(uncompressed_bytes as u32, &mut self.page_buffer)?
as usize;
self.size_without_statistics +=
utils::write_var_u32(compressed_bytes as u32, &mut self.page_buffer)? as usize;
match self.compression_type {
CompressionType::UNCOMPRESSED => {
self.page_buffer.write_all(&page_writer.buffer);
}
_ => {
self.page_buffer.write_all(&compressed_buffer);
}
}
page_writer.buffer.clear();
self.first_page_statistics = Some(page_writer.statistics.clone())
} else if self.num_pages == 1 {
let temp = self.page_buffer.clone();
self.page_buffer.clear();
log::trace!("Page Buffer offset: {}", self.page_buffer.get_position());
let header_bytes = &temp[0..self.size_without_statistics];
self.page_buffer.write_all(header_bytes);
log::trace!("Page Buffer offset: {}", self.page_buffer.get_position());
match &self.first_page_statistics {
Some(stat) => stat.serialize(&mut self.page_buffer),
_ => panic!("This should not happen!"),
};
log::trace!("Page Buffer offset: {}", self.page_buffer.get_position());
let remainder_bytes = &temp[self.size_without_statistics..];
self.page_buffer.write_all(remainder_bytes);
log::trace!("Page Buffer offset: {}", self.page_buffer.get_position());
utils::write_var_u32(uncompressed_bytes as u32, &mut self.page_buffer);
utils::write_var_u32(compressed_bytes as u32, &mut self.page_buffer);
log::trace!("Page Buffer offset: {}", self.page_buffer.get_position());
log::trace!("Statistics: {:?}", &page_writer.statistics);
page_writer.statistics.serialize(&mut self.page_buffer);
log::trace!(
"Flushing page at page buffer offset {}",
self.page_buffer.get_position()
);
let pos_before_flush = self.page_buffer.get_position();
match self.compression_type {
CompressionType::UNCOMPRESSED => {
self.page_buffer.write_all(&page_writer.buffer);
}
_ => {
self.page_buffer.write_all(&compressed_buffer);
}
}
let pos_after_flush = self.page_buffer.get_position();
log::trace!(
"Wrote {} bytes to page buffer",
pos_after_flush - pos_before_flush
);
log::trace!("Page Buffer offset: {}", self.page_buffer.get_position());
page_writer.buffer.clear();
self.first_page_statistics = None;
} else {
utils::write_var_u32(uncompressed_bytes as u32, &mut self.page_buffer);
utils::write_var_u32(compressed_bytes as u32, &mut self.page_buffer);
page_writer.statistics.serialize(&mut self.page_buffer);
let pos_before_flush = self.page_buffer.get_position();
match self.compression_type {
CompressionType::UNCOMPRESSED => {
self.page_buffer.write_all(&page_writer.buffer);
}
_ => {
self.page_buffer.write_all(&compressed_buffer);
}
}
let pos_after_flush = self.page_buffer.get_position();
log::trace!(
"Wrote {} bytes to page buffer",
pos_after_flush - pos_before_flush
);
page_writer.buffer.clear();
}
self.num_pages += 1;
self.statistics.merge(&page_writer.statistics);
page_writer.reset();
}
Ok(())
}
}
pub struct ChunkHeader {
pub measurement_id: String,
pub data_size: u32,
pub data_type: TSDataType,
pub compression: CompressionType,
pub encoding: TSEncoding,
pub num_pages: u32,
pub mask: u8,
}
impl ChunkHeader {
pub(crate) fn serialize<T: PositionedWrite>(
&self,
file_writer: &mut T,
) -> Result<(), TsFileError> {
let marker = if self.num_pages <= 1 {
ONLY_ONE_PAGE_CHUNK_HEADER
} else {
CHUNK_HEADER
};
let marker = marker | self.mask;
file_writer.write_all(&[marker])?;
write_str(file_writer, self.measurement_id.as_str())?;
utils::write_var_u32(self.data_size, file_writer)?;
file_writer.write_all(&[self.data_type.serialize()])?;
file_writer.write_all(&[self.compression.serialize()])?;
file_writer.write_all(&[self.encoding.serialize()])?;
Ok(())
}
pub(crate) fn new(
measurement_id: String,
data_size: u32,
data_type: TSDataType,
compression: CompressionType,
encoding: TSEncoding,
num_pages: u32,
mask: u8,
) -> ChunkHeader {
ChunkHeader {
measurement_id,
data_size,
data_type,
compression,
encoding,
num_pages,
mask,
}
}
}
#[derive(Clone)]
pub struct ChunkMetadata {
pub(crate) measurement_id: String,
pub(crate) data_type: TSDataType,
pub(crate) mask: u8,
offset_of_chunk_header: i64,
pub(crate) statistics: Statistics,
}
impl ChunkMetadata {
pub(crate) fn new(
measurement_id: String,
data_type: TSDataType,
position: u64,
statistics: Statistics,
mask: u8,
) -> ChunkMetadata {
ChunkMetadata {
measurement_id,
data_type,
mask,
offset_of_chunk_header: position as i64,
statistics,
}
}
pub(crate) fn serialize(
&self,
file: &mut dyn PositionedWrite,
serialize_statistics: bool,
) -> io::Result<()> {
let result = file.write_all(&self.offset_of_chunk_header.to_be_bytes());
if serialize_statistics {
self.statistics.serialize(file);
}
result
}
}
impl Display for ChunkMetadata {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{} (...)", self.measurement_id)
}
}