use crate::cfg::io::IoConfig;
use crate::core::sync::AsyncRwLock;
use crate::storage::block_manager::{BlockManager, BlockRef};
use crate::storage::entry::RecordReader;
use crate::storage::proto::record;
use crate::storage::proto::{record::State as RecordState, ts_to_us, Record};
use crate::storage::query::base::{Query, QueryOptions};
use crate::storage::query::condition::Parser;
use crate::storage::query::filters::{
apply_filters_recursively, EachNFilter, EachSecondFilter, ExcludeLabelFilter, FilterRecord,
IncludeLabelFilter, RecordFilter, RecordStateFilter, TimeRangeFilter, WhenFilter,
};
use async_trait::async_trait;
use log::debug;
use reduct_base::error::ErrorCode;
use reduct_base::error::ReductError;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
impl FilterRecord for Record {
fn state(&self) -> i32 {
self.state
}
fn timestamp(&self) -> u64 {
self.timestamp.as_ref().map_or(0, |ts| ts_to_us(ts))
}
fn labels(&self) -> HashMap<&String, &String> {
HashMap::from_iter(self.labels.iter().map(|label| (&label.name, &label.value)))
}
fn set_labels(&mut self, labels: HashMap<String, String>) {
self.labels = labels
.into_iter()
.map(|(k, v)| record::Label { name: k, value: v })
.collect();
}
fn computed_labels(&self) -> HashMap<&String, &String> {
HashMap::new()
}
}
type Filter = Box<dyn RecordFilter<Record> + Send + Sync>;
pub struct HistoricalQuery {
start_time: u64,
stop_time: u64,
entry_name: String,
records_from_current_block: VecDeque<Record>,
current_block: Option<BlockRef>,
filters: Vec<Filter>,
only_metadata: bool,
is_interrupted: bool,
io_config: IoConfig,
}
impl HistoricalQuery {
pub fn try_new(
entry_name: String,
start_time: u64,
stop_time: u64,
options: QueryOptions,
io_defaults: IoConfig,
) -> Result<Self, ReductError> {
let mut filters: Vec<Filter> = vec![
Box::new(TimeRangeFilter::new(start_time, stop_time)),
Box::new(RecordStateFilter::new(RecordState::Finished)),
];
if !options.include.is_empty() {
filters.push(Box::new(IncludeLabelFilter::new(options.include.clone())));
}
if !options.exclude.is_empty() {
filters.push(Box::new(ExcludeLabelFilter::new(options.exclude.clone())));
}
if let Some(each_s) = options.each_s {
filters.push(Box::new(EachSecondFilter::new(each_s)));
}
if let Some(each_n) = options.each_n {
filters.push(Box::new(EachNFilter::new(each_n)));
}
let mut only_metadata = if options.ext.is_none() {
options.only_metadata
} else {
false
};
let mut io_config = io_defaults;
if let Some(when) = options.when {
let parser = Parser::new();
let (condition, directives) = parser.parse(when)?;
if directives.contains_key("#ext") {
only_metadata = false;
}
let filter = WhenFilter::try_new(condition, directives, io_config, options.strict)?;
io_config = filter.io_config().clone();
filters.push(Box::new(filter));
}
Ok(HistoricalQuery {
entry_name,
start_time,
stop_time,
records_from_current_block: VecDeque::new(),
current_block: None,
filters,
only_metadata,
is_interrupted: false,
io_config,
})
}
}
#[async_trait]
impl Query for HistoricalQuery {
async fn next(
&mut self,
block_manager: Arc<AsyncRwLock<BlockManager>>,
) -> Result<RecordReader, ReductError> {
if self.records_from_current_block.is_empty() && !self.is_interrupted {
let start = if let Some(block) = &self.current_block {
let block = block.read().await?;
block.latest_record_time()
} else {
self.start_time
};
let block_range = {
let mut bm = block_manager.write().await?;
let first_block = match bm.find_block(start).await {
Ok(block) => block.read().await?.block_id(),
Err(err) if err.status() == ErrorCode::TooEarly => {
debug!(
"Skip query start block lookup for '{}' due to transient replica state: {}",
self.entry_name, err
);
0
}
Err(_) => 0,
};
bm.index()
.tree()
.range(first_block..self.stop_time)
.map(|k| *k)
.collect::<Vec<u64>>()
};
for block_id in block_range {
let mut bm = block_manager.write().await?;
let block_ref = match bm.load_block(block_id).await {
Ok(block_ref) => block_ref,
Err(err) if err.status() == ErrorCode::TooEarly => {
debug!(
"Skip transient replica block {} for query '{}': {}",
block_id, self.entry_name, err
);
continue;
}
Err(err) if err.status() == ErrorCode::NotFound => {
debug!(
"Skip stale block {} for query '{}': {}",
block_id, self.entry_name, err
);
continue;
}
Err(err) => return Err(err),
};
self.current_block = Some(block_ref);
let mut found_records = self.filter_records_from_current_block().await?;
found_records.sort_by_key(|rec| ts_to_us(rec.timestamp.as_ref().unwrap()));
self.records_from_current_block.extend(found_records);
if !self.records_from_current_block.is_empty() {
break;
}
}
}
if self.records_from_current_block.is_empty() {
return Err(ReductError::no_content("No content"));
}
let record = self.records_from_current_block.pop_front().unwrap();
let block = self.current_block.as_ref().unwrap();
if self.only_metadata {
Ok(RecordReader::form_record(&self.entry_name, record))
} else {
RecordReader::try_new(
Arc::clone(&block_manager),
block.clone(),
ts_to_us(&record.timestamp.unwrap()),
Some(record),
None,
)
.await
}
}
fn io_settings(&self) -> &IoConfig {
&self.io_config
}
}
impl HistoricalQuery {
async fn filter_records_from_current_block(&mut self) -> Result<Vec<Record>, ReductError> {
let block = self.current_block.as_ref().unwrap().read().await?;
let mut filtered_records = Vec::new();
for record in block.record_index().values() {
match apply_filters_recursively(self.filters.as_mut_slice(), vec![record.clone()], 0)? {
Some(records) => {
filtered_records.extend(records);
}
None => {
self.is_interrupted = true;
break;
}
}
}
Ok(filtered_records)
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use std::collections::HashMap;
use super::*;
use crate::cfg::io::IoConfig;
use crate::cfg::{Cfg, InstanceRole};
use crate::core::file_cache::FILE_CACHE;
use crate::storage::block_manager::block::Block;
use crate::storage::block_manager::block_index::BlockIndex;
use crate::storage::proto::record;
use crate::storage::proto::{us_to_ts, Record};
use crate::storage::query::base::tests::block_manager;
use reduct_base::error::ErrorCode;
use reduct_base::io::BoxedReadRecord;
use reduct_base::io::ReadRecord;
use reduct_base::{no_content, not_found};
fn build_query(
start_time: u64,
stop_time: u64,
options: QueryOptions,
) -> Result<HistoricalQuery, ReductError> {
HistoricalQuery::try_new(
"entry".to_string(),
start_time,
stop_time,
options,
IoConfig::default(),
)
}
mod new {
use super::*;
use serde_json::json;
#[rstest]
fn override_only_metadata_if_ext_set() {
let options = QueryOptions {
only_metadata: true,
ext: Some(json!({})),
..Default::default()
};
let query = build_query(0, 1000, options).unwrap();
assert!(
!query.only_metadata,
"only_metadata should be false if ext is set to get content of records"
);
}
#[rstest]
fn when_directive_ext_overrides_only_metadata() {
let options = QueryOptions {
only_metadata: true,
when: Some(json!({"$and": ["&flag"], "#ext": {}})),
..Default::default()
};
let query = build_query(0, 1000, options).unwrap();
assert!(
!query.only_metadata,
"only_metadata should be false if #ext directive is set in when condition"
);
}
}
#[test]
fn test_set_labels() {
let mut record = Record {
timestamp: Some(us_to_ts(&100)),
begin: 0,
end: 10,
state: record::State::Finished as i32,
labels: vec![],
content_type: "text/plain".to_string(),
};
let new_labels = HashMap::from([
("key1".to_string(), "value1".to_string()),
("key2".to_string(), "value2".to_string()),
]);
record.set_labels(new_labels);
assert_eq!(record.labels.len(), 2);
assert_eq!(record.labels()[&"key1".to_string()], "value1");
assert_eq!(record.labels()[&"key2".to_string()], "value2");
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_ok_1_rec(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(0, 5, QueryOptions::default()).unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 1);
assert_eq!(records[0].0.meta().timestamp(), 0);
assert_eq!(records[0].1, "0123456789");
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_only_metadata_returns_entry_name(
#[future] block_manager: Arc<AsyncRwLock<BlockManager>>,
) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
5,
QueryOptions {
only_metadata: true,
..QueryOptions::default()
},
)
.unwrap();
let reader = query.next(block_manager.clone()).await.unwrap();
assert_eq!(reader.meta().timestamp(), 0);
assert_eq!(
reader.meta().entry_name(),
"entry",
"entry_name should be returned for HEAD request (only_metadata=true)"
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_ok_2_recs(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(0, 1000, QueryOptions::default()).unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 2);
assert_eq!(records[0].0.meta().timestamp(), 0);
assert_eq!(records[0].1, "0123456789");
assert_eq!(records[1].0.meta().timestamp(), 5);
assert_eq!(records[1].1, "0123456789");
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_ok_3_recs(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(0, 1001, QueryOptions::default()).unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 3);
assert_eq!(records[0].0.meta().timestamp(), 0);
assert_eq!(records[0].1, "0123456789");
assert_eq!(records[1].0.meta().timestamp(), 5);
assert_eq!(records[1].1, "0123456789");
assert_eq!(records[2].0.meta().timestamp(), 1000);
assert_eq!(records[2].1, "0123456789");
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_skips_missing_block_descriptor_on_replica(
#[future] block_manager: Arc<AsyncRwLock<BlockManager>>,
) {
let source_block_manager = block_manager.await;
let (path, bucket, entry) = {
let mut bm = source_block_manager.write().await.unwrap();
bm.save_cache_on_disk().await.unwrap();
(
bm.path().clone(),
bm.bucket_name().to_string(),
bm.entry_name().to_string(),
)
};
FILE_CACHE.remove(&path.join("1000.meta")).await.unwrap();
let cfg = Cfg {
role: InstanceRole::Replica,
..Default::default()
};
let index = BlockIndex::try_load(path.join("index")).await.unwrap();
let block_manager = Arc::new(AsyncRwLock::new(
BlockManager::build(path, index, bucket, entry, Arc::new(cfg))
.await
.unwrap(),
));
let mut query = build_query(0, 1001, QueryOptions::default()).unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 2);
assert_eq!(records[0].0.meta().timestamp(), 0);
assert_eq!(records[1].0.meta().timestamp(), 5);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_does_not_reload_index_on_crc_mismatch_replica(
#[future] block_manager: Arc<AsyncRwLock<BlockManager>>,
) {
let source_block_manager = block_manager.await;
let (path, bucket, entry) = {
let mut bm = source_block_manager.write().await.unwrap();
bm.save_cache_on_disk().await.unwrap();
(
bm.path().clone(),
bm.bucket_name().to_string(),
bm.entry_name().to_string(),
)
};
let cfg = Cfg {
role: InstanceRole::Replica,
..Default::default()
};
let index = BlockIndex::try_load(path.join("index")).await.unwrap();
let replica_block_manager = Arc::new(AsyncRwLock::new(
BlockManager::build(path, index, bucket, entry, Arc::new(cfg))
.await
.unwrap(),
));
{
let mut bm = replica_block_manager.write().await.unwrap();
bm.index_mut()
.insert_or_update_with_crc(Block::new(1000), 1);
}
let mut query = build_query(1000, 1001, QueryOptions::default()).unwrap();
let err = query.next(replica_block_manager).await.err().unwrap();
assert_eq!(err.status(), ErrorCode::NoContent);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_include(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
include: HashMap::from([
("block".to_string(), "2".to_string()),
("record".to_string(), "1".to_string()),
]),
..QueryOptions::default()
},
)
.unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 1);
assert_eq!(records[0].0.meta().timestamp(), 1000);
assert_eq!(
records[0].0.meta().labels(),
&HashMap::from([
("block".to_string(), "2".to_string()),
("record".to_string(), "1".to_string()),
])
);
assert_eq!(records[0].1, "0123456789");
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_query_exclude(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
exclude: HashMap::from([
("block".to_string(), "1".to_string()),
("record".to_string(), "1".to_string()),
]),
..QueryOptions::default()
},
)
.unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 2);
assert_eq!(records[0].0.meta().timestamp(), 5);
assert_eq!(
records[0].0.meta().labels(),
&HashMap::from([
("block".to_string(), "1".to_string()),
("record".to_string(), "2".to_string()),
("flag".to_string(), "false".to_string()),
])
);
assert_eq!(records[0].1, "0123456789");
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_ignoring_errored_records(
#[future] block_manager: Arc<AsyncRwLock<BlockManager>>,
) {
let block_manager = block_manager.await;
let mut query = build_query(0, 5, QueryOptions::default()).unwrap();
{
let block_ref = block_manager
.write()
.await
.unwrap()
.load_block(0)
.await
.unwrap();
{
let mut block = block_ref.write().await.unwrap();
let mut record = block.get_record(0).unwrap().clone();
record.state = record::State::Errored as i32;
block.insert_or_update_record(record);
}
block_manager
.write()
.await
.unwrap()
.save_block(block_ref)
.await
.unwrap();
}
assert_eq!(
query.next(block_manager.clone()).await.err(),
Some(no_content!("No content"))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_each_s_filter(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
each_s: Some(0.00001),
..QueryOptions::default()
},
)
.unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 2);
assert_eq!(records[0].0.meta().timestamp(), 0);
assert_eq!(records[1].0.meta().timestamp(), 1000);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_each_n_records(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
each_n: Some(2),
..QueryOptions::default()
},
)
.unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 2);
assert_eq!(records[0].0.meta().timestamp(), 0);
assert_eq!(records[1].0.meta().timestamp(), 1000);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_when_filter(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
when: Some(serde_json::from_str(r#"{"$and": ["&flag"]}"#).unwrap()),
..QueryOptions::default()
},
)
.unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 1);
assert_eq!(records[0].0.meta().timestamp(), 0);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_when_filter_strict(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
when: Some(serde_json::from_str(r#"{"$and": ["&NOT_EXIST"]}"#).unwrap()),
strict: true,
..QueryOptions::default()
},
)
.unwrap();
assert_eq!(
query.next(block_manager.clone()).await.err(),
Some(not_found!("Reference 'NOT_EXIST' not found"))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_when_with_interruption(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
when: Some(serde_json::from_str(r#"{"$limit": [1]}"#).unwrap()),
strict: true,
..QueryOptions::default()
},
)
.unwrap();
let records = read_to_vector(&mut query, block_manager).await;
assert_eq!(records.len(), 1);
assert_eq!(records[0].0.meta().timestamp(), 0);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_when_filter_non_strict(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = build_query(
0,
1001,
QueryOptions {
when: Some(serde_json::from_str(r#"{"$and": ["&NOT_EXIST"]}"#).unwrap()),
strict: false,
..QueryOptions::default()
},
)
.unwrap();
assert_eq!(
query.next(block_manager.clone()).await.err(),
Some(no_content!("No content")),
"errored condition should be ignored in non-strict mode"
);
}
async fn read_to_vector(
query: &mut HistoricalQuery,
block_manager: Arc<AsyncRwLock<BlockManager>>,
) -> Vec<(BoxedReadRecord, String)> {
let mut records: Vec<(BoxedReadRecord, String)> = Vec::new();
loop {
match query.next(block_manager.clone()).await {
Ok(mut reader) => {
let mut content = String::new();
while let Some(chunk) = reader.read_chunk() {
content
.push_str(String::from_utf8(chunk.unwrap().to_vec()).unwrap().as_str())
}
records.push((Box::new(reader), content));
}
Err(err) => {
assert_eq!(err.status, ErrorCode::NoContent);
break;
}
}
}
records
}
}