use std::io::{Seek, SeekFrom, Read, Write};
use std::fs::File;
use crc64fast;
use std::ops::Bound::*;
use std::collections::BTreeMap;
use crate::Error;
use crate::store::Span;
pub(crate) const MAX_RECORD_SIZE: usize = 1 << 24;
const RECORD_HDR_SIZE: usize = 8 + 3;
pub(crate) struct RecordHeader {
pub logical_offset: u64,
pub length: u64,
}
pub(crate) struct Record {
pub hdr: RecordHeader,
pub file_data_offset: u64,
}
fn debug_check_spans(spans: &BTreeMap<u64, Span>)
{
let mut prev_end = None;
for (&off, span) in spans {
debug_assert!(span.len > 0);
let end = off + span.len;
if let Some(pe) = prev_end {
debug_assert!(off >= pe);
}
prev_end = Some(end);
}
}
fn read_bytes_fail_back(file: &mut File,
buf: &mut [u8],
total_read: &mut u64) -> Result<bool, Error>
{
let length = file.read(buf)?;
*total_read += length as u64;
if length == buf.len() {
return Ok(true);
}
if length != 0 {
file.seek_relative(-(*total_read as i64))?;
}
return Ok(false);
}
pub(crate) fn validate(file: &mut File,
data_offset: u64,
data_length: usize) -> Result<bool, Error>
{
let mut bytes = vec![0u8; RECORD_HDR_SIZE + data_length + 8];
file.seek(SeekFrom::Start(data_offset - RECORD_HDR_SIZE as u64))?;
file.read_exact(&mut bytes)?;
let mut d = crc64fast::Digest::new();
d.write(&bytes[..RECORD_HDR_SIZE + data_length]);
let csum_start = bytes.len() - 8;
Ok(d.sum64() == u64::from_le_bytes(bytes[csum_start..csum_start + 8].try_into().unwrap()))
}
pub(crate) fn read_next_record(file: &mut File, file_offset: &mut u64) -> Result<Option<Record>, Error>
{
let mut hdrbytes = [0u8; RECORD_HDR_SIZE];
let mut total_read: u64 = 0;
if !read_bytes_fail_back(file, &mut hdrbytes, &mut total_read)? {
return Ok(None);
}
let len24 = (hdrbytes[8] as u32) | ((hdrbytes[9] as u32) << 8) | ((hdrbytes[10] as u32) << 16);
let rhdr = RecordHeader {
logical_offset: u64::from_le_bytes(hdrbytes[..8].try_into().unwrap()),
length: len24 as u64,
};
let rec = Record {
hdr: rhdr,
file_data_offset: *file_offset + hdrbytes.len() as u64,
};
let mut data = vec![0u8; rec.hdr.length as usize];
if !read_bytes_fail_back(file, &mut data, &mut total_read)? {
return Ok(None);
}
let mut tlrbytes = [0u8; 8];
if !read_bytes_fail_back(file, &mut tlrbytes, &mut total_read)? {
return Ok(None);
}
let mut d = crc64fast::Digest::new();
d.write(&hdrbytes);
d.write(&data);
if d.sum64() != u64::from_le_bytes(tlrbytes.try_into().unwrap()) {
file.seek_relative(-(total_read as i64))?;
return Ok(None);
}
*file_offset += total_read;
return Ok(Some(rec));
}
pub(crate) fn write_record(file: &mut File,
logical_offset: u64,
data: &[u8],
file_size: &mut u64)
-> Result<u64, Error>
{
let offhdr = logical_offset.to_le_bytes();
let len = data.len();
debug_assert!(len < MAX_RECORD_SIZE);
debug_assert!(MAX_RECORD_SIZE - 1 <= 0x00FF_FFFF);
let lenhdr = [(len & 0xFF) as u8,
((len >> 8) & 0xFF) as u8,
((len >> 16) & 0xFF) as u8];
file.write_all(&offhdr)?;
file.write_all(&lenhdr)?;
let data_off = *file_size + offhdr.len() as u64 + lenhdr.len() as u64;
file.write_all(data)?;
let mut d = crc64fast::Digest::new();
d.write(&offhdr);
d.write(&lenhdr);
d.write(data);
let tlr = u64::to_le_bytes(d.sum64());
file.write_all(&tlr)?;
*file_size = data_off + data.len() as u64 + tlr.len() as u64;
Ok(data_off)
}
fn split_span(spans: &mut BTreeMap<u64, Span>, logical_offset: u64)
{
if let Some((&offset, span)) = spans.range((Included(0), Excluded(logical_offset))).next_back() {
if offset + span.len > logical_offset {
let before_len = logical_offset - offset;
assert!(span.validated);
let newspan = Span { len: span.len - before_len,
file_data_offset: span.file_data_offset + before_len,
validated: span.validated };
spans.insert(logical_offset, newspan);
spans.get_mut(&offset).unwrap().len = before_len;
}
}
}
pub(crate) fn add_record(spans: &mut BTreeMap<u64, Span>,
logical_offset: u64,
len: u64,
file_data_offset: u64,
validated: bool)
{
split_span(spans, logical_offset);
split_span(spans, logical_offset + len);
let overlaps: Vec<u64> = spans
.range((Included(logical_offset), Excluded(logical_offset + len)))
.map(|(&k, _)| k)
.collect();
for k in overlaps {
spans.remove(&k);
}
spans.insert(logical_offset, Span { len: len,
file_data_offset: file_data_offset,
validated: validated,
});
debug_check_spans(spans);
}