use crate::datatypes::values::Value;
use crate::graph::schema::InternedKey;
use crate::graph::storage::type_build_meta::ColType;
use chrono::NaiveDate;
use memmap2::MmapMut;
use std::collections::HashMap;
use std::sync::Arc;
const UNIX_EPOCH_DATE: NaiveDate = match NaiveDate::from_ymd_opt(1970, 1, 1) {
Some(d) => d,
None => unreachable!(),
};
#[derive(Clone, Copy, Debug)]
pub struct Region {
pub offset: usize,
pub len: usize, }
impl Region {
pub const EMPTY: Region = Region { offset: 0, len: 0 };
}
#[derive(Clone, Debug)]
pub struct FixedColumnMeta {
pub col_type: ColType,
pub data: Region, pub nulls: Region, }
#[derive(Clone, Debug)]
pub struct StrColumnMeta {
pub data: Region, pub offsets: Region, pub nulls: Region, }
#[derive(Clone, Copy, Debug)]
pub(crate) enum ColRef {
Fixed(usize), Str(usize), }
#[derive(Clone, Debug)]
pub struct MmapColumnStore {
pub(crate) mmap: Arc<MmapMut>,
pub(crate) row_count: u32,
pub(crate) id_is_string: bool,
pub(crate) id_fixed: Option<FixedColumnMeta>,
pub(crate) id_str: Option<StrColumnMeta>,
pub(crate) title: StrColumnMeta,
pub(crate) col_map: HashMap<InternedKey, ColRef>,
pub(crate) fixed_cols: Vec<FixedColumnMeta>,
pub(crate) str_cols: Vec<StrColumnMeta>,
pub(crate) overflow_offsets: Region,
pub(crate) overflow_data: Region,
pub(crate) has_overflow: bool,
}
impl MmapColumnStore {
#[inline]
pub fn row_count(&self) -> u32 {
self.row_count
}
}
impl MmapColumnStore {
#[inline]
fn read_null(&self, region: &Region, row: usize) -> bool {
if region.len == 0 {
return true;
}
self.mmap[region.offset + row] != 0
}
#[inline]
fn read_i64(&self, region: &Region, row: usize) -> i64 {
let off = region.offset + row * 8;
i64::from_ne_bytes(self.mmap[off..off + 8].try_into().unwrap())
}
#[inline]
fn read_f64(&self, region: &Region, row: usize) -> f64 {
let off = region.offset + row * 8;
f64::from_ne_bytes(self.mmap[off..off + 8].try_into().unwrap())
}
#[inline]
fn read_u64(&self, region: &Region, row: usize) -> u64 {
let off = region.offset + row * 8;
u64::from_ne_bytes(self.mmap[off..off + 8].try_into().unwrap())
}
#[inline]
fn read_u32(&self, region: &Region, row: usize) -> u32 {
let off = region.offset + row * 4;
u32::from_ne_bytes(self.mmap[off..off + 4].try_into().unwrap())
}
#[inline]
fn read_i32(&self, region: &Region, row: usize) -> i32 {
let off = region.offset + row * 4;
i32::from_ne_bytes(self.mmap[off..off + 4].try_into().unwrap())
}
#[inline]
fn read_u8(&self, region: &Region, row: usize) -> u8 {
self.mmap[region.offset + row]
}
#[inline]
fn read_str(&self, data_region: &Region, offsets_region: &Region, row: usize) -> &str {
let end = self.read_u64(offsets_region, row) as usize;
let start = if row > 0 {
self.read_u64(offsets_region, row - 1) as usize
} else {
0
};
let bytes = &self.mmap[data_region.offset + start..data_region.offset + end];
unsafe { std::str::from_utf8_unchecked(bytes) }
}
}
impl MmapColumnStore {
pub fn get_id(&self, row_id: u32) -> Option<Value> {
if row_id >= self.row_count {
return None;
}
let row = row_id as usize;
if self.id_is_string {
let sc = self.id_str.as_ref()?;
if self.read_null(&sc.nulls, row) {
return None;
}
Some(Value::String(
self.read_str(&sc.data, &sc.offsets, row).to_string(),
))
} else {
let fc = self.id_fixed.as_ref()?;
if self.read_null(&fc.nulls, row) {
return None;
}
Some(Value::UniqueId(self.read_u32(&fc.data, row)))
}
}
pub fn get_title(&self, row_id: u32) -> Option<Value> {
if row_id >= self.row_count {
return None;
}
let row = row_id as usize;
if self.title.nulls.len == 0 && self.title.data.len == 0 {
return None;
}
if self.read_null(&self.title.nulls, row) {
return None;
}
Some(Value::String(
self.read_str(&self.title.data, &self.title.offsets, row)
.to_string(),
))
}
pub fn str_prop_eq(&self, row_id: u32, key: InternedKey, target: &str) -> Option<bool> {
if row_id >= self.row_count {
return None;
}
let row = row_id as usize;
if let Some(col_ref) = self.col_map.get(&key) {
match col_ref {
ColRef::Str(idx) => {
let sc = &self.str_cols[*idx];
if self.read_null(&sc.nulls, row) {
return None;
}
return Some(self.read_str(&sc.data, &sc.offsets, row) == target);
}
ColRef::Fixed(_) => {
return Some(false);
}
}
}
self.get_overflow_property(row_id, key)
.map(|v| matches!(v, Value::String(ref s) if s == target))
}
pub fn get(&self, row_id: u32, key: InternedKey) -> Option<Value> {
if row_id >= self.row_count {
return None;
}
let row = row_id as usize;
if let Some(col_ref) = self.col_map.get(&key) {
match col_ref {
ColRef::Fixed(idx) => {
let fc = &self.fixed_cols[*idx];
if !self.read_null(&fc.nulls, row) {
return Some(self.read_fixed_value(fc, row));
}
}
ColRef::Str(idx) => {
let sc = &self.str_cols[*idx];
if !self.read_null(&sc.nulls, row) {
let s = self.read_str(&sc.data, &sc.offsets, row);
return Some(Value::String(s.to_string()));
}
}
}
}
self.get_overflow_property(row_id, key)
}
#[inline]
fn read_fixed_value(&self, fc: &FixedColumnMeta, row: usize) -> Value {
match fc.col_type {
ColType::Int64 => Value::Int64(self.read_i64(&fc.data, row)),
ColType::Float64 => Value::Float64(self.read_f64(&fc.data, row)),
ColType::UniqueId => Value::UniqueId(self.read_u32(&fc.data, row)),
ColType::Bool => Value::Boolean(self.read_u8(&fc.data, row) != 0),
ColType::Date => {
let days = self.read_i32(&fc.data, row);
let date = UNIX_EPOCH_DATE + chrono::Duration::days(days as i64);
Value::DateTime(date)
}
ColType::Str => unreachable!("string columns use ColRef::Str"),
}
}
pub fn id_borrowed(&self, row_id: u32) -> Option<crate::datatypes::values::BorrowedValue<'_>> {
use crate::datatypes::values::BorrowedValue;
if row_id >= self.row_count {
return None;
}
let row = row_id as usize;
if self.id_is_string {
let sc = self.id_str.as_ref()?;
if self.read_null(&sc.nulls, row) {
return None;
}
Some(BorrowedValue::String(self.read_str(
&sc.data,
&sc.offsets,
row,
)))
} else {
let fc = self.id_fixed.as_ref()?;
if self.read_null(&fc.nulls, row) {
return None;
}
Some(BorrowedValue::UniqueId(self.read_u32(&fc.data, row)))
}
}
pub fn title_borrowed(&self, row_id: u32) -> Option<&str> {
if row_id >= self.row_count {
return None;
}
let row = row_id as usize;
if self.title.nulls.len == 0 && self.title.data.len == 0 {
return None;
}
if self.read_null(&self.title.nulls, row) {
return None;
}
Some(self.read_str(&self.title.data, &self.title.offsets, row))
}
pub fn try_for_each_property_borrowed<F, E>(&self, row_id: u32, mut f: F) -> Result<(), E>
where
F: FnMut(InternedKey, crate::datatypes::values::BorrowedValue<'_>) -> Result<(), E>,
{
use crate::datatypes::values::BorrowedValue;
if row_id >= self.row_count {
return Ok(());
}
let row = row_id as usize;
for (&key, col_ref) in &self.col_map {
match col_ref {
ColRef::Fixed(idx) => {
let fc = &self.fixed_cols[*idx];
if !self.read_null(&fc.nulls, row) {
let bv = match fc.col_type {
ColType::Int64 => BorrowedValue::Int64(self.read_i64(&fc.data, row)),
ColType::Float64 => {
BorrowedValue::Float64(self.read_f64(&fc.data, row))
}
ColType::UniqueId => {
BorrowedValue::UniqueId(self.read_u32(&fc.data, row))
}
ColType::Bool => {
BorrowedValue::Boolean(self.read_u8(&fc.data, row) != 0)
}
ColType::Date => {
let days = self.read_i32(&fc.data, row);
BorrowedValue::DateTime(
UNIX_EPOCH_DATE + chrono::Duration::days(days as i64),
)
}
ColType::Str => unreachable!("string columns use ColRef::Str"),
};
f(key, bv)?;
}
}
ColRef::Str(idx) => {
let sc = &self.str_cols[*idx];
if !self.read_null(&sc.nulls, row) {
let s = self.read_str(&sc.data, &sc.offsets, row);
f(key, BorrowedValue::String(s))?;
}
}
}
}
if let Some(blob) = self.overflow_blob(row_id) {
if blob.len() >= 2 {
let num_entries = u16::from_le_bytes([blob[0], blob[1]]) as usize;
let mut pos = 2;
for _ in 0..num_entries {
if pos + 9 > blob.len() {
break;
}
let entry_key =
u64::from_le_bytes(blob[pos..pos + 8].try_into().unwrap_or([0; 8]));
let type_tag = blob[pos + 8];
pos += 9;
let key = InternedKey::from_u64(entry_key);
match type_tag {
0 => {
f(key, BorrowedValue::Null)?;
}
1 => {
if pos + 8 > blob.len() {
break;
}
let v =
i64::from_le_bytes(blob[pos..pos + 8].try_into().unwrap_or([0; 8]));
pos += 8;
f(key, BorrowedValue::Int64(v))?;
}
2 => {
if pos + 8 > blob.len() {
break;
}
let v =
f64::from_le_bytes(blob[pos..pos + 8].try_into().unwrap_or([0; 8]));
pos += 8;
f(key, BorrowedValue::Float64(v))?;
}
3 => {
if pos + 4 > blob.len() {
break;
}
let v =
u32::from_le_bytes(blob[pos..pos + 4].try_into().unwrap_or([0; 4]));
pos += 4;
f(key, BorrowedValue::UniqueId(v))?;
}
4 => {
if pos + 1 > blob.len() {
break;
}
let v = blob[pos] != 0;
pos += 1;
f(key, BorrowedValue::Boolean(v))?;
}
5 => {
if pos + 4 > blob.len() {
break;
}
let days =
i32::from_le_bytes(blob[pos..pos + 4].try_into().unwrap_or([0; 4]));
pos += 4;
f(
key,
BorrowedValue::DateTime(
UNIX_EPOCH_DATE + chrono::Duration::days(days as i64),
),
)?;
}
6 => {
if pos + 4 > blob.len() {
break;
}
let slen =
u32::from_le_bytes(blob[pos..pos + 4].try_into().unwrap_or([0; 4]))
as usize;
pos += 4;
if pos + slen > blob.len() {
break;
}
let s =
unsafe { std::str::from_utf8_unchecked(&blob[pos..pos + slen]) };
pos += slen;
f(key, BorrowedValue::String(s))?;
}
_ => break,
}
}
}
}
Ok(())
}
pub fn row_properties(&self, row_id: u32) -> Vec<(InternedKey, Value)> {
if row_id >= self.row_count {
return Vec::new();
}
let row = row_id as usize;
let mut result = Vec::new();
for (&key, col_ref) in &self.col_map {
match col_ref {
ColRef::Fixed(idx) => {
let fc = &self.fixed_cols[*idx];
if !self.read_null(&fc.nulls, row) {
result.push((key, self.read_fixed_value(fc, row)));
}
}
ColRef::Str(idx) => {
let sc = &self.str_cols[*idx];
if !self.read_null(&sc.nulls, row) {
let s = self.read_str(&sc.data, &sc.offsets, row);
result.push((key, Value::String(s.to_string())));
}
}
}
}
result.extend(self.overflow_row_properties(row_id));
result
}
}
impl MmapColumnStore {
pub fn get_overflow_property(&self, row_id: u32, key: InternedKey) -> Option<Value> {
let blob = self.overflow_blob(row_id)?;
Self::scan_overflow_blob(blob, key)
}
fn overflow_row_properties(&self, row_id: u32) -> Vec<(InternedKey, Value)> {
match self.overflow_blob(row_id) {
Some(blob) => Self::decode_overflow_blob(blob),
None => Vec::new(),
}
}
fn overflow_blob(&self, row_id: u32) -> Option<&[u8]> {
if !self.has_overflow {
return None;
}
let idx = row_id as usize;
let expected_len = (self.row_count as usize + 1) * 8;
if self.overflow_offsets.len < expected_len {
return None;
}
let start = self.read_u64(&self.overflow_offsets, idx) as usize;
let end = self.read_u64(&self.overflow_offsets, idx + 1) as usize;
if start >= end || end > self.overflow_data.len {
return None;
}
Some(&self.mmap[self.overflow_data.offset + start..self.overflow_data.offset + end])
}
fn scan_overflow_blob(blob: &[u8], key: InternedKey) -> Option<Value> {
let target = key.as_u64();
if blob.len() < 2 {
return None;
}
let num_entries = u16::from_le_bytes([blob[0], blob[1]]) as usize;
let mut pos = 2;
for _ in 0..num_entries {
if pos + 9 > blob.len() {
break;
}
let entry_key = u64::from_le_bytes(blob[pos..pos + 8].try_into().ok()?);
let type_tag = blob[pos + 8];
pos += 9;
if entry_key == target {
return Self::read_overflow_value(blob, &mut pos, type_tag);
}
Self::skip_overflow_value(blob, &mut pos, type_tag);
}
None
}
fn decode_overflow_blob(blob: &[u8]) -> Vec<(InternedKey, Value)> {
if blob.len() < 2 {
return Vec::new();
}
let num_entries = u16::from_le_bytes([blob[0], blob[1]]) as usize;
let mut result = Vec::with_capacity(num_entries);
let mut pos = 2;
for _ in 0..num_entries {
if pos + 9 > blob.len() {
break;
}
let entry_key = u64::from_le_bytes(blob[pos..pos + 8].try_into().unwrap_or([0; 8]));
let type_tag = blob[pos + 8];
pos += 9;
let key = InternedKey::from_u64(entry_key);
if let Some(val) = Self::read_overflow_value(blob, &mut pos, type_tag) {
result.push((key, val));
}
}
result
}
fn read_overflow_value(blob: &[u8], pos: &mut usize, type_tag: u8) -> Option<Value> {
match type_tag {
0 => Some(Value::Null),
1 => {
if *pos + 8 > blob.len() {
return None;
}
let v = i64::from_le_bytes(blob[*pos..*pos + 8].try_into().ok()?);
*pos += 8;
Some(Value::Int64(v))
}
2 => {
if *pos + 8 > blob.len() {
return None;
}
let v = f64::from_le_bytes(blob[*pos..*pos + 8].try_into().ok()?);
*pos += 8;
Some(Value::Float64(v))
}
3 => {
if *pos + 4 > blob.len() {
return None;
}
let v = u32::from_le_bytes(blob[*pos..*pos + 4].try_into().ok()?);
*pos += 4;
Some(Value::UniqueId(v))
}
4 => {
if *pos + 1 > blob.len() {
return None;
}
let v = blob[*pos] != 0;
*pos += 1;
Some(Value::Boolean(v))
}
5 => {
if *pos + 4 > blob.len() {
return None;
}
let days = i32::from_le_bytes(blob[*pos..*pos + 4].try_into().ok()?);
*pos += 4;
let date = UNIX_EPOCH_DATE + chrono::Duration::days(days as i64);
Some(Value::DateTime(date))
}
6 => {
if *pos + 4 > blob.len() {
return None;
}
let slen = u32::from_le_bytes(blob[*pos..*pos + 4].try_into().ok()?) as usize;
*pos += 4;
if *pos + slen > blob.len() {
return None;
}
let s = String::from_utf8_lossy(&blob[*pos..*pos + slen]).into_owned();
*pos += slen;
Some(Value::String(s))
}
_ => None,
}
}
fn skip_overflow_value(blob: &[u8], pos: &mut usize, type_tag: u8) {
match type_tag {
0 => {} 1 | 2 => *pos += 8, 3 | 5 => *pos += 4, 4 => *pos += 1, 6 if *pos + 4 <= blob.len() => {
let slen =
u32::from_le_bytes(blob[*pos..*pos + 4].try_into().unwrap_or([0; 4])) as usize;
*pos += 4 + slen;
}
_ => {}
}
}
}