use std::collections::HashMap;
use std::fmt::Debug;
use std::io::{Read, Seek, SeekFrom};
use std::marker::PhantomData;
use std::ops::Range;
use chrono::{DateTime, Utc};
use ordered_float::OrderedFloat;
use crate::static_btree::error::{Error, Result};
use crate::static_btree::key::{FixedStringKey, Key, KeyType, Max, Min};
use crate::static_btree::query::types::{Operator, QueryCondition};
use crate::static_btree::stree::Stree;
#[derive(Debug, Clone)]
pub struct StreamIndex<K: Key> {
num_items: usize,
branching_factor: u16,
index_offset: u64,
length: u64,
_marker: PhantomData<K>,
}
impl<K: Key> StreamIndex<K> {
pub fn new(num_items: usize, branching_factor: u16, index_offset: u64, length: u64) -> Self {
Self {
num_items,
branching_factor,
index_offset,
length,
_marker: PhantomData,
}
}
pub fn num_items(&self) -> usize {
self.num_items
}
pub fn branching_factor(&self) -> u16 {
self.branching_factor
}
pub fn index_offset(&self) -> u64 {
self.index_offset
}
pub fn length(&self) -> u64 {
self.length
}
pub fn find_exact_with_reader<R: Read + Seek + ?Sized>(
&self,
reader: &mut R,
key: K,
) -> Result<Vec<u64>> {
let results = Stree::stream_find_exact(reader, self.num_items, self.branching_factor, key)?;
Ok(results.into_iter().map(|item| item.offset as u64).collect())
}
pub fn find_range_with_reader<R: Read + Seek + ?Sized>(
&self,
reader: &mut R,
start: Option<K>,
end: Option<K>,
) -> Result<Vec<u64>> {
let start_position = reader.stream_position()?;
let results = match (start, end) {
(Some(start_key), Some(end_key)) => {
let results = Stree::stream_find_range(
reader,
self.num_items,
self.branching_factor,
start_key,
end_key,
)?;
Ok(results.into_iter().map(|item| item.offset as u64).collect())
}
(Some(start_key), None) => {
let results = Stree::stream_find_range(
reader,
self.num_items,
self.branching_factor,
start_key,
K::max_value(),
)?;
Ok(results.into_iter().map(|item| item.offset as u64).collect())
}
(None, Some(end_key)) => {
let results = Stree::stream_find_range(
reader,
self.num_items,
self.branching_factor,
K::min_value(),
end_key,
)?;
Ok(results.into_iter().map(|item| item.offset as u64).collect())
}
(None, None) => Err(Error::QueryError(
"find_range requires at least one bound".to_string(),
)),
};
reader.seek(SeekFrom::Start(start_position))?;
results
}
}
pub trait ReadSeek: Read + Seek {}
impl<T: Read + Seek> ReadSeek for T {}
pub trait TypedStreamSearchIndex: Send + Sync {
fn execute_query_condition(
&self,
reader: &mut dyn ReadSeek,
condition: &QueryCondition,
) -> Result<Vec<u64>>;
}
macro_rules! impl_typed_stream_search_index {
($key_type:ty, $enum_variant:path) => {
impl TypedStreamSearchIndex for StreamIndex<$key_type> {
fn execute_query_condition(
&self,
reader: &mut dyn ReadSeek,
condition: &QueryCondition,
) -> Result<Vec<u64>> {
let start_position = reader.stream_position()?;
let key = match &condition.key {
$enum_variant(val) => val.clone(),
_ => {
return Err(Error::QueryError(format!(
"key type mismatch: expected {}, got {:?}",
stringify!($key_type),
condition.key
)))
}
};
let items = match condition.operator {
Operator::Eq => self.find_exact_with_reader(reader, key)?,
Operator::Ne => {
let all_items = self.find_range_with_reader(
reader,
Some(<$key_type>::min_value()),
Some(<$key_type>::max_value()),
)?;
let matching_items = self.find_exact_with_reader(reader, key.clone())?;
all_items
.into_iter()
.filter(|item| !matching_items.contains(item))
.collect()
}
Operator::Gt => {
let mut results =
self.find_range_with_reader(reader, Some(key.clone()), None)?;
let exact_matches = self.find_exact_with_reader(reader, key.clone())?;
results.retain(|item| !exact_matches.contains(item));
results
}
Operator::Lt => {
let mut results =
self.find_range_with_reader(reader, None, Some(key.clone()))?;
let exact_matches = self.find_exact_with_reader(reader, key.clone())?;
results.retain(|item| !exact_matches.contains(item));
results
}
Operator::Ge => self.find_range_with_reader(reader, Some(key), None)?,
Operator::Le => self.find_range_with_reader(reader, None, Some(key))?,
};
reader.seek(SeekFrom::Start(start_position))?;
Ok(items)
}
}
};
}
impl_typed_stream_search_index!(i8, KeyType::Int8);
impl_typed_stream_search_index!(u8, KeyType::UInt8);
impl_typed_stream_search_index!(i16, KeyType::Int16);
impl_typed_stream_search_index!(u16, KeyType::UInt16);
impl_typed_stream_search_index!(i32, KeyType::Int32);
impl_typed_stream_search_index!(i64, KeyType::Int64);
impl_typed_stream_search_index!(u32, KeyType::UInt32);
impl_typed_stream_search_index!(u64, KeyType::UInt64);
impl_typed_stream_search_index!(OrderedFloat<f32>, KeyType::Float32);
impl_typed_stream_search_index!(OrderedFloat<f64>, KeyType::Float64);
impl_typed_stream_search_index!(bool, KeyType::Bool);
impl_typed_stream_search_index!(DateTime<Utc>, KeyType::DateTime);
impl_typed_stream_search_index!(FixedStringKey<20>, KeyType::StringKey20);
impl_typed_stream_search_index!(FixedStringKey<50>, KeyType::StringKey50);
impl_typed_stream_search_index!(FixedStringKey<100>, KeyType::StringKey100);
pub struct StreamMultiIndex {
indices: HashMap<String, Box<dyn TypedStreamSearchIndex>>,
index_offsets: HashMap<String, Range<usize>>,
}
impl StreamMultiIndex {
pub fn new() -> Self {
Self {
indices: HashMap::new(),
index_offsets: HashMap::new(),
}
}
pub fn add_index<K: Key + 'static>(&mut self, field: String, index: StreamIndex<K>)
where
StreamIndex<K>: TypedStreamSearchIndex,
{
self.indices.insert(field, Box::new(index));
}
fn add_index_offset(&mut self, field: String, length: u64) {
let largest_offset = self
.index_offsets
.values()
.map(|v| v.end)
.max()
.unwrap_or(0);
self.index_offsets
.insert(field, largest_offset..largest_offset + length as usize);
}
pub fn add_string_index20(
&mut self,
field: String,
index: StreamIndex<FixedStringKey<20>>,
length: u64,
) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_string_index50(
&mut self,
field: String,
index: StreamIndex<FixedStringKey<50>>,
length: u64,
) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_string_index100(
&mut self,
field: String,
index: StreamIndex<FixedStringKey<100>>,
length: u64,
) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_i8_index(&mut self, field: String, index: StreamIndex<i8>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_u8_index(&mut self, field: String, index: StreamIndex<u8>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_i16_index(&mut self, field: String, index: StreamIndex<i16>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_u16_index(&mut self, field: String, index: StreamIndex<u16>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_i32_index(&mut self, field: String, index: StreamIndex<i32>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_i64_index(&mut self, field: String, index: StreamIndex<i64>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_u32_index(&mut self, field: String, index: StreamIndex<u32>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_u64_index(&mut self, field: String, index: StreamIndex<u64>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_f32_index(
&mut self,
field: String,
index: StreamIndex<OrderedFloat<f32>>,
length: u64,
) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_f64_index(
&mut self,
field: String,
index: StreamIndex<OrderedFloat<f64>>,
length: u64,
) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_bool_index(&mut self, field: String, index: StreamIndex<bool>, length: u64) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn add_datetime_index(
&mut self,
field: String,
index: StreamIndex<DateTime<Utc>>,
length: u64,
) {
self.indices.insert(field.clone(), Box::new(index));
self.add_index_offset(field, length);
}
pub fn query(
&self,
reader: &mut dyn ReadSeek,
conditions: &[QueryCondition],
) -> Result<Vec<u64>> {
if conditions.is_empty() {
return Err(Error::QueryError("query cannot be empty".to_string()));
}
let first = &conditions[0];
let indexer = self.indices.get(&first.field).ok_or_else(|| {
Error::QueryError(format!("no index found for field '{}'", first.field))
})?;
let index_range = self.index_offsets.get(&first.field).ok_or_else(|| {
Error::QueryError(format!("no index range found for field '{}'", first.field))
})?;
let start_position = reader.stream_position()?;
reader.seek(SeekFrom::Start(start_position + index_range.start as u64))?;
let mut result_set = indexer.execute_query_condition(reader, first)?;
if result_set.is_empty() {
return Ok(vec![]);
}
reader.seek(SeekFrom::Start(start_position))?;
for cond in &conditions[1..] {
let start_position = reader.stream_position()?;
let indexer = self.indices.get(&cond.field).ok_or_else(|| {
Error::QueryError(format!("no index found for field '{}'", cond.field))
})?;
let index_range = self.index_offsets.get(&cond.field).ok_or_else(|| {
Error::QueryError(format!("no index range found for field '{}'", cond.field))
})?;
let index_start = start_position + index_range.start as u64;
reader.seek(SeekFrom::Start(index_start))?;
println!("index_start: {index_start}");
println!("start_position: {start_position}");
println!("query condition: {cond:?}");
let condition_results = indexer.execute_query_condition(reader, cond)?;
result_set.retain(|offset| condition_results.contains(offset));
if result_set.is_empty() {
return Ok(vec![]); }
reader.seek(SeekFrom::Start(start_position))?;
}
reader.seek(SeekFrom::Start(start_position))?;
Ok(result_set)
}
}
impl Default for StreamMultiIndex {
fn default() -> Self {
Self::new()
}
}