use crate::deserializer::to_cj_feature;
use crate::{add_indices_to_multi_memory_index, build_query, fb::*, AttrQuery};
use crate::error::{Error, Result};
use crate::packed_rtree::Query;
use crate::reader::city_buffer::FcbBuffer;
use crate::static_btree::{FixedStringKey, Float, KeyType, Operator};
use crate::{
check_magic_bytes, size_prefixed_root_as_city_feature, HEADER_MAX_BUFFER_SIZE,
HEADER_SIZE_SIZE, MAGIC_BYTES_SIZE,
};
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use cjseq::CityJSONFeature;
use http_range_client::BufferedHttpRangeClient;
use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient};
use log::debug;
use reqwest;
use crate::packed_rtree::{http::HttpRange, http::HttpSearchResultItem, NodeItem, PackedRTree};
use crate::static_btree::{
http::HttpRange as AttrHttpRange, http::HttpSearchResultItem as AttrHttpSearchResultItem,
};
use crate::static_btree::{HttpIndex, HttpMultiIndex};
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ops::Range;
use tracing::trace;
#[cfg(test)]
mod mock_http_range_client;
const DEFAULT_HTTP_FETCH_SIZE: usize = 1_048_576;
pub struct HttpFcbReader<T: AsyncHttpRangeClient + Send + Sync> {
client: AsyncBufferedHttpRangeClient<T>,
fbs: FcbBuffer,
}
pub struct AsyncFeatureIter<T: AsyncHttpRangeClient + Send + Sync> {
client: AsyncBufferedHttpRangeClient<T>,
fbs: FcbBuffer,
selection: FeatureSelection,
count: usize,
}
impl HttpFcbReader<reqwest::Client> {
pub async fn open(url: &str) -> Result<HttpFcbReader<reqwest::Client>> {
let client = BufferedHttpRangeClient::new(url);
Self::_open(client).await
}
}
impl<T: AsyncHttpRangeClient + Send + Sync> HttpFcbReader<T> {
pub async fn new(client: AsyncBufferedHttpRangeClient<T>) -> Result<HttpFcbReader<T>> {
Self::_open(client).await
}
async fn _open(mut client: AsyncBufferedHttpRangeClient<T>) -> Result<HttpFcbReader<T>> {
let prefetch_index_bytes: usize = {
let assumed_branching_factor = PackedRTree::DEFAULT_NODE_SIZE as usize;
let prefetched_layers: u32 = 3;
(0..prefetched_layers)
.map(|i| assumed_branching_factor.pow(i) * std::mem::size_of::<NodeItem>())
.sum()
};
let assumed_header_size = 2024;
let min_req_size = assumed_header_size + prefetch_index_bytes;
client.set_min_req_size(min_req_size);
let mut read_bytes = 0;
let bytes = client.get_range(read_bytes, MAGIC_BYTES_SIZE).await?; if !check_magic_bytes(bytes) {
return Err(Error::MissingMagicBytes);
}
read_bytes += MAGIC_BYTES_SIZE;
let mut bytes = BytesMut::from(client.get_range(read_bytes, HEADER_SIZE_SIZE).await?);
read_bytes += HEADER_SIZE_SIZE;
let header_size = LittleEndian::read_u32(&bytes) as usize;
if header_size > HEADER_MAX_BUFFER_SIZE || header_size < 8 {
return Err(Error::IllegalHeaderSize(header_size));
}
bytes.put(client.get_range(read_bytes, header_size).await?);
read_bytes += header_size;
let header_buf = bytes.to_vec();
let _header = size_prefixed_root_as_header(&header_buf)?;
Ok(HttpFcbReader {
client,
fbs: FcbBuffer {
header_buf,
features_buf: Vec::new(),
},
})
}
pub fn header(&self) -> Header {
self.fbs.header()
}
fn header_len(&self) -> usize {
MAGIC_BYTES_SIZE + self.fbs.header_buf.len()
}
fn rtree_index_size(&self) -> usize {
let header = self.fbs.header();
let feat_count = header.features_count() as usize;
if header.index_node_size() > 0 && feat_count > 0 {
PackedRTree::index_size(feat_count, header.index_node_size())
} else {
0
}
}
fn attr_index_size(&self) -> usize {
let header = self.fbs.header();
header
.attribute_index()
.map(|attr_index| {
attr_index
.iter()
.try_fold(0, |acc, ai| {
let len = ai.length() as usize;
if len > usize::MAX - acc {
Err(Error::AttributeIndexSizeOverflow)
} else {
Ok(acc + len)
}
}) .unwrap_or(0)
})
.unwrap_or(0)
}
fn index_size(&self) -> usize {
self.rtree_index_size() + self.attr_index_size()
}
pub async fn select_all(self) -> Result<AsyncFeatureIter<T>> {
let header = self.fbs.header();
let count = header.features_count();
let index_size = self.index_size() as usize;
let feature_base = self.header_len() + index_size;
Ok(AsyncFeatureIter {
client: self.client,
fbs: self.fbs,
selection: FeatureSelection::SelectAll(SelectAll {
features_left: count,
pos: feature_base,
}),
count: count as usize,
})
}
pub async fn select_query(mut self, query: Query) -> Result<AsyncFeatureIter<T>> {
self.select_query_paged(query, None, None).await
}
pub async fn select_query_paged(
mut self,
query: Query,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<AsyncFeatureIter<T>> {
let header = self.fbs.header();
if header.index_node_size() == 0 || header.features_count() == 0 {
return Err(Error::NoIndex);
}
let count = header.features_count() as usize;
let header_len = self.header_len();
let combine_request_threshold = 256 * 1024;
let attr_index_size = self.attr_index_size() as usize;
let list = PackedRTree::http_stream_search(
&mut self.client,
header_len,
attr_index_size,
count,
PackedRTree::DEFAULT_NODE_SIZE,
query,
combine_request_threshold,
)
.await?;
debug_assert!(
list.windows(2)
.all(|w| w[0].range.start() < w[1].range.start()),
"Since the tree is traversed breadth first, list should be sorted by construction."
);
let total_count = list.len();
let start = offset.unwrap_or(0).min(total_count);
let end = match limit {
Some(l) => start.saturating_add(l).min(total_count),
None => total_count,
};
let page_list: Vec<_> = if start < end {
list.into_iter().skip(start).take(end - start).collect()
} else {
Vec::new()
};
let feature_batches =
FeatureBatch::make_batches(page_list, combine_request_threshold).await?;
let selection = FeatureSelection::SelectBbox(SelectBbox { feature_batches });
Ok(AsyncFeatureIter {
client: self.client,
fbs: self.fbs,
selection,
count: total_count,
})
}
pub async fn select_attr_query(mut self, query: &AttrQuery) -> Result<AsyncFeatureIter<T>> {
self.select_attr_query_paged(query, None, None).await
}
pub async fn select_attr_query_paged(
mut self,
query: &AttrQuery,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<AsyncFeatureIter<T>> {
let header = self.fbs.header();
let header_len = self.header_len();
let rtree_index_size = self.rtree_index_size() as usize;
let attr_index_size = self.attr_index_size() as usize;
let attr_index_begin = header_len + rtree_index_size;
let feature_begin = header_len + rtree_index_size + attr_index_size;
let attr_index_entries = header
.attribute_index()
.ok_or_else(|| Error::AttributeIndexNotFound)?;
let mut attr_index_entries = attr_index_entries.iter().collect::<Vec<_>>();
let columns: Vec<Column> = header
.columns()
.ok_or_else(|| Error::NoColumnsInHeader)?
.iter()
.collect();
attr_index_entries.sort_by_key(|attr_info| attr_info.index());
let query = build_query(&query);
let mut http_multi_index = HttpMultiIndex::new();
let mut current_index_begin = attr_index_begin;
for attr_info in attr_index_entries.iter() {
Self::add_indices_to_multi_http_index(
&mut http_multi_index,
&columns,
attr_info,
current_index_begin,
feature_begin,
)?;
current_index_begin += attr_info.length() as usize;
}
let result = http_multi_index
.query(&mut self.client, &query.conditions)
.await?;
let total_count = result.len();
let start = offset.unwrap_or(0).min(total_count);
let end = match limit {
Some(l) => start.saturating_add(l).min(total_count),
None => total_count,
};
let paged_iter: Vec<_> = if start < end {
result.into_iter().skip(start).take(end - start).collect()
} else {
Vec::new()
};
let http_ranges: Vec<HttpRange> = paged_iter
.into_iter()
.map(|item| match item.range {
AttrHttpRange::Range(range) => HttpRange::Range(range.start..range.end),
AttrHttpRange::RangeFrom(range) => HttpRange::RangeFrom(range.start..),
})
.collect();
Ok(AsyncFeatureIter {
client: self.client,
fbs: self.fbs,
selection: FeatureSelection::SelectAttr(SelectAttr {
ranges: http_ranges,
range_pos: 0,
}),
count: total_count,
})
}
pub fn add_indices_to_multi_http_index<C: AsyncHttpRangeClient + Send + Sync>(
multi_index: &mut HttpMultiIndex<C>,
columns: &[Column],
attr_info: &AttributeIndex,
index_begin: usize,
feature_begin: usize,
) -> Result<()> {
if let Some(col) = columns.iter().find(|col| col.index() == attr_info.index()) {
match col.type_() {
ColumnType::Int => {
let index = HttpIndex::<i32>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::Float => {
let index = HttpIndex::<Float<f32>>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::Double => {
let index = HttpIndex::<Float<f64>>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::String => {
let index = HttpIndex::<FixedStringKey<50>>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::Bool => {
let index = HttpIndex::<bool>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::DateTime => {
let index = HttpIndex::<DateTime<Utc>>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::Short => {
let index = HttpIndex::<i16>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::UShort => {
let index = HttpIndex::<u16>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::UInt => {
let index = HttpIndex::<u32>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::ULong => {
let index = HttpIndex::<u64>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::Byte => {
let index = HttpIndex::<i8>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
ColumnType::UByte => {
let index = HttpIndex::<u8>::new(
attr_info.num_unique_items() as usize,
attr_info.branching_factor(),
index_begin,
feature_begin,
1024 * 1024, );
multi_index.add_index(col.name().to_string(), index);
}
_ => {
println!("Unsupported column type: {:?}", col.type_());
return Err(Error::UnsupportedColumnType(col.name().to_string()));
}
}
}
Ok(())
}
}
impl<T: AsyncHttpRangeClient + Send + Sync> AsyncFeatureIter<T> {
pub fn header(&self) -> Header {
self.fbs.header()
}
pub fn features_count(&self) -> Option<usize> {
if self.count > 0 {
Some(self.count)
} else {
None
}
}
pub async fn next(&mut self) -> Result<Option<&FcbBuffer>> {
let Some(buffer) = self.selection.next_feature_buffer(&mut self.client).await? else {
return Ok(None);
};
self.fbs.features_buf = buffer.to_vec();
let _feature = size_prefixed_root_as_city_feature(&self.fbs.features_buf)?;
Ok(Some(&self.fbs))
}
pub fn cur_feature(&self) -> &FcbBuffer {
&self.fbs
}
pub fn cur_cj_feature(&self) -> Result<CityJSONFeature> {
let cj_feature = to_cj_feature(
self.cur_feature().feature(),
self.header().columns(),
self.header().semantic_columns(),
)?;
Ok(cj_feature)
}
}
enum FeatureSelection {
SelectAll(SelectAll),
SelectBbox(SelectBbox),
SelectAttr(SelectAttr),
}
impl FeatureSelection {
async fn next_feature_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
match self {
FeatureSelection::SelectAll(select_all) => select_all.next_buffer(client).await,
FeatureSelection::SelectBbox(select_bbox) => select_bbox.next_buffer(client).await,
FeatureSelection::SelectAttr(select_attr) => select_attr.next_buffer(client).await,
}
}
}
struct SelectAll {
features_left: u64,
pos: usize,
}
impl SelectAll {
async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
client.min_req_size(DEFAULT_HTTP_FETCH_SIZE);
if self.features_left == 0 {
return Ok(None);
}
self.features_left -= 1;
let mut feature_buffer = BytesMut::from(client.get_range(self.pos, 4).await?);
self.pos += 4;
let feature_size = LittleEndian::read_u32(&feature_buffer) as usize;
feature_buffer.put(client.get_range(self.pos, feature_size).await?);
self.pos += feature_size;
Ok(Some(feature_buffer.freeze()))
}
}
struct SelectBbox {
feature_batches: Vec<FeatureBatch>,
}
impl SelectBbox {
async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
let mut next_buffer = None;
while next_buffer.is_none() {
let Some(feature_batch) = self.feature_batches.last_mut() else {
break;
};
let Some(buffer) = feature_batch.next_buffer(client).await? else {
self.feature_batches
.pop()
.expect("already asserted feature_batches was non-empty");
continue;
};
next_buffer = Some(buffer)
}
Ok(next_buffer)
}
}
struct FeatureBatch {
feature_ranges: VecDeque<HttpRange>,
}
impl FeatureBatch {
async fn make_batches(
feature_ranges: Vec<HttpSearchResultItem>,
combine_request_threshold: usize,
) -> Result<Vec<Self>> {
let mut batched_ranges = vec![];
for search_result_item in feature_ranges.into_iter() {
let Some(latest_batch) = batched_ranges.last_mut() else {
let mut new_batch = VecDeque::new();
new_batch.push_back(search_result_item.range);
batched_ranges.push(new_batch);
continue;
};
let previous_item = latest_batch.back().expect("we never push an empty batch");
let HttpRange::Range(Range { end: prev_end, .. }) = previous_item else {
debug_assert!(false, "This shouldn't happen. Only the very last feature is expected to have an unknown length");
let mut new_batch = VecDeque::new();
new_batch.push_back(search_result_item.range);
batched_ranges.push(new_batch);
continue;
};
let wasted_bytes = search_result_item.range.start() - prev_end;
if wasted_bytes < combine_request_threshold {
latest_batch.push_back(search_result_item.range)
} else {
debug!("creating a new request for batch rather than wasting {wasted_bytes} bytes");
let mut new_batch = VecDeque::new();
new_batch.push_back(search_result_item.range);
batched_ranges.push(new_batch);
}
}
let mut batches: Vec<_> = batched_ranges.into_iter().map(FeatureBatch::new).collect();
batches.reverse();
Ok(batches)
}
fn new(feature_ranges: VecDeque<HttpRange>) -> Self {
Self { feature_ranges }
}
fn request_size(&self) -> usize {
let Some(first) = self.feature_ranges.front() else {
return 0;
};
let Some(last) = self.feature_ranges.back() else {
return 0;
};
let last_feature_length = last.length().unwrap_or(4);
let covering_range = first.start()..last.start() + last_feature_length;
covering_range
.len()
.min(DEFAULT_HTTP_FETCH_SIZE)
}
async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
let request_size = self.request_size();
client.set_min_req_size(request_size);
let Some(feature_range) = self.feature_ranges.pop_front() else {
return Ok(None);
};
let mut pos = feature_range.start();
let mut feature_buffer = BytesMut::from(client.get_range(pos, 4).await?);
pos += 4;
let feature_size = LittleEndian::read_u32(&feature_buffer) as usize;
feature_buffer.put(client.get_range(pos, feature_size).await?);
Ok(Some(feature_buffer.freeze()))
}
}
struct SelectAttr {
ranges: Vec<HttpRange>,
range_pos: usize,
}
impl SelectAttr {
async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
let Some(range) = self.ranges.get(self.range_pos) else {
return Ok(None);
};
let mut feature_buffer = BytesMut::from(client.get_range(range.start(), 4).await?);
let feature_size = LittleEndian::read_u32(&feature_buffer) as usize;
feature_buffer.put(client.get_range(range.start() + 4, feature_size).await?);
self.range_pos += 1;
Ok(Some(feature_buffer.freeze()))
}
}