use std::io::Read;
use rkyv::{de::Pool, rancor::Strategy, Archive, DeserializeUnsized};
use crate::{
page::util::parse_general_header,
persistence::data::{rkyv_data::parse_archived_row, DataTypeValue},
IndexData, Link,
};
use super::{index::{ArchivedIndexValue, IndexValue}, seek_by_link, seek_to_page_start, Interval, SpaceInfo};
pub struct LinksIterator<'a> {
file: &'a mut std::fs::File,
page_id: u32,
links: Option<Vec<Link>>,
link_index: usize,
primary_key_type: String,
}
impl<'a> LinksIterator<'a> {
pub fn new(
file: &'a mut std::fs::File,
page_id: u32,
space_info: &SpaceInfo,
) -> LinksIterator<'a> {
let primary_key_fields = &space_info.primary_key_fields;
let primary_key_type = space_info
.row_schema
.iter()
.filter(|(field_name, _field_type)| field_name == &primary_key_fields[0])
.map(|(_field_name, field_type)| field_type)
.take(1)
.collect::<Vec<&String>>()[0];
LinksIterator {
file,
page_id,
links: None,
link_index: 0,
primary_key_type: primary_key_type.clone(),
}
}
}
fn parse_links<T>(buffer: &[u8]) -> Vec<Link>
where T: Archive,
[ArchivedIndexValue<T>]: DeserializeUnsized<[IndexValue<T>], Strategy<Pool, rkyv::rancor::Error>>
{
let archived = unsafe {
rkyv::access_unchecked::<<IndexData<T> as Archive>::Archived>(
&buffer[..],
)
};
let index_records =
rkyv::deserialize::<IndexData<T>, rkyv::rancor::Error>(archived)
.expect("data should be valid")
.index_values;
index_records
.iter()
.map(|index_value| index_value.link)
.collect::<Vec<_>>()
}
impl Iterator for LinksIterator<'_> {
type Item = Link;
fn next(&mut self) -> Option<Self::Item> {
if self.links.is_none() {
seek_to_page_start(&mut self.file, self.page_id).expect("page should be seekable");
let header = parse_general_header(&mut self.file).expect("header should be readable");
let mut buffer: Vec<u8> = vec![0u8; header.data_length as usize];
self.file
.read_exact(&mut buffer)
.expect("index data should be readable");
self.links = Some(match self.primary_key_type.as_str() {
"String" => parse_links::<String>(&buffer),
"i128" => parse_links::<i128>(&buffer),
"i64" => parse_links::<i64>(&buffer),
"i32" => parse_links::<i32>(&buffer),
"i16" => parse_links::<i16>(&buffer),
"i8" => parse_links::<i8>(&buffer),
"u128" => parse_links::<u128>(&buffer),
"u64" => parse_links::<u64>(&buffer),
"u32" => parse_links::<u32>(&buffer),
"u16" => parse_links::<u16>(&buffer),
"u8" => parse_links::<u8>(&buffer),
"f64" => parse_links::<f64>(&buffer),
"f32" => parse_links::<f32>(&buffer),
_ => panic!(
"Unsupported primary key data type `{}`",
self.primary_key_type
),
});
}
if self.link_index < self.links.as_deref().unwrap().len() {
let result = Some(self.links.as_deref().unwrap()[self.link_index]);
self.link_index += 1;
result
} else {
None
}
}
}
pub struct DataIterator<'a> {
file: &'a mut std::fs::File,
schema: Vec<(String, String)>,
links: Vec<Link>,
link_index: usize,
}
impl DataIterator<'_> {
pub fn new(
file: &mut std::fs::File,
schema: Vec<(String, String)>,
mut links: Vec<Link>,
) -> DataIterator<'_> {
links.sort_by(|a, b| {
(a.page_id, a.offset)
.partial_cmp(&(b.page_id, b.offset))
.unwrap()
});
DataIterator {
file,
schema,
links,
link_index: 0,
}
}
}
impl Iterator for DataIterator<'_> {
type Item = Vec<DataTypeValue>;
fn next(&mut self) -> Option<Self::Item> {
if self.link_index >= self.links.len() {
return None;
}
let current_link = self.links[self.link_index];
seek_by_link(&mut self.file, current_link).expect("the seek should be successful");
let mut buffer = vec![0u8; current_link.length as usize];
self.file
.read_exact(&mut buffer)
.expect("the data should be read");
let row = parse_archived_row(&buffer, &self.schema);
self.link_index += 1;
Some(row)
}
}
#[cfg(test)]
mod test {
use crate::{
page::{iterators::DataIterator, util::parse_space_info, PageId},
persistence::data::DataTypeValue,
Interval, Link, PAGE_SIZE,
};
use crate::page::util::test::create_test_database_file;
use super::LinksIterator;
#[test]
fn test_links_iterator() {
let filename = "tests/data/table_links_test.wt";
create_test_database_file(filename);
let mut file = std::fs::File::open(filename).unwrap();
let space_info = parse_space_info::<PAGE_SIZE>(&mut file).unwrap();
let links = LinksIterator::<'_>::new(&mut file, 1, &space_info);
assert_eq!(
links.collect::<Vec<_>>(),
vec![
Link {
page_id: PageId(2),
offset: 0,
length: 24
},
Link {
page_id: PageId(2),
offset: 24,
length: 28
}
]
);
}
#[test]
fn test_pages_and_links_iterators() {
let filename = "tests/data/table_pages_and_links_test.wt";
create_test_database_file(filename);
let mut file = std::fs::File::open(filename).unwrap();
let space_info = parse_space_info::<PAGE_SIZE>(&mut file).unwrap();
let index_intervals = space_info.primary_key_intervals.clone();
let pages_ids = PageIterator::new(index_intervals).collect::<Vec<_>>();
assert_eq!(pages_ids, vec![1]);
let links =
LinksIterator::<'_>::new(&mut file, pages_ids[0], &space_info).collect::<Vec<_>>();
let data_iterator: DataIterator<'_> =
DataIterator::new(&mut file, space_info.row_schema, links);
assert_eq!(
data_iterator.collect::<Vec<_>>(),
vec![
vec![
DataTypeValue::I32(1),
DataTypeValue::String("first string".to_string())
],
vec![
DataTypeValue::I32(2),
DataTypeValue::String("second string".to_string())
]
]
);
}
}