use crate::cfg::io::IoConfig;
use crate::core::sync::AsyncRwLock;
use crate::storage::block_manager::BlockManager;
use crate::storage::entry::RecordReader;
use async_trait::async_trait;
use reduct_base::error::ReductError;
use reduct_base::msg::entry_api::QueryEntry;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
#[async_trait]
pub(in crate::storage) trait Query {
async fn next(
&mut self,
block_manager: Arc<AsyncRwLock<BlockManager>>,
) -> Result<RecordReader, ReductError>;
fn io_settings(&self) -> &IoConfig;
}
#[derive(Clone, Debug)]
pub(crate) struct QueryOptions {
pub ttl: Duration,
pub include: HashMap<String, String>,
pub exclude: HashMap<String, String>,
pub continuous: bool,
pub limit: Option<u64>,
pub each_n: Option<u64>,
pub each_s: Option<f64>,
pub only_metadata: bool,
pub when: Option<Value>,
pub strict: bool,
#[allow(dead_code)] pub ext: Option<Value>,
}
impl From<QueryEntry> for QueryOptions {
fn from(query: QueryEntry) -> QueryOptions {
QueryOptions {
ttl: Duration::from_secs(query.ttl.unwrap_or(Self::default().ttl.as_secs())),
include: query.include.unwrap_or_default(),
exclude: query.exclude.unwrap_or_default(),
continuous: query.continuous.unwrap_or(false),
limit: query.limit,
each_n: query.each_n,
each_s: query.each_s,
only_metadata: query.only_metadata.unwrap_or(false),
when: query.when,
strict: query.strict.unwrap_or(false),
ext: query.ext,
}
}
}
impl Default for QueryOptions {
fn default() -> QueryOptions {
QueryOptions {
ttl: Duration::from_secs(60),
include: HashMap::new(),
exclude: HashMap::new(),
continuous: false,
limit: None,
each_n: None,
each_s: None,
only_metadata: false,
when: None,
strict: false,
ext: None,
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::cfg::Cfg;
use crate::core::file_cache::FILE_CACHE;
use crate::storage::block_manager::block_index::BlockIndex;
use crate::storage::proto::record::{Label, State as RecordState};
use crate::storage::proto::Record;
use prost_wkt_types::Timestamp;
use rstest::fixture;
use std::io::{SeekFrom, Write};
use tempfile::tempdir;
#[fixture]
pub(crate) async fn block_manager() -> Arc<AsyncRwLock<BlockManager>> {
let dir = tempdir().unwrap().keep().join("bucket").join("entry");
let mut block_manager = BlockManager::build(
dir.clone(),
BlockIndex::new(dir.join("index")),
"bucket".to_string(),
"entry".to_string(),
Cfg::default().into(),
)
.await
.unwrap();
let block_ref = block_manager.start_new_block(0, 10).await.unwrap();
{
let mut block = block_ref.write().await.unwrap();
block.insert_or_update_record(Record {
timestamp: Some(Timestamp {
seconds: 0,
nanos: 0,
}),
begin: 0,
end: 10,
state: RecordState::Finished as i32,
labels: vec![
Label {
name: "block".to_string(),
value: "1".to_string(),
},
Label {
name: "record".to_string(),
value: "1".to_string(),
},
Label {
name: "flag".to_string(),
value: "true".to_string(),
},
],
content_type: "".to_string(),
});
block.insert_or_update_record(Record {
timestamp: Some(Timestamp {
seconds: 0,
nanos: 5000,
}),
begin: 10,
end: 20,
state: RecordState::Finished as i32,
labels: vec![
Label {
name: "block".to_string(),
value: "1".to_string(),
},
Label {
name: "record".to_string(),
value: "2".to_string(),
},
Label {
name: "flag".to_string(),
value: "false".to_string(),
},
],
content_type: "".to_string(),
});
}
block_manager.save_block(block_ref.clone()).await.unwrap();
macro_rules! write_record {
($block:expr, $index:expr, $content:expr) => {{
let blk = $block.read().await.unwrap();
let (path, offset) = block_manager.begin_write_record(&blk, $index).unwrap();
let mut file = FILE_CACHE
.write_or_create(&path, SeekFrom::Start(offset))
.await
.unwrap();
file.write($content).unwrap();
file.flush().unwrap();
}};
}
write_record!(block_ref, 0, b"0123456789");
write_record!(block_ref, 5, b"0123456789");
block_manager.finish_block(block_ref).await.unwrap();
let block_ref = block_manager.start_new_block(1000, 10).await.unwrap();
{
let mut block = block_ref.write().await.unwrap();
block.insert_or_update_record(Record {
timestamp: Some(Timestamp {
seconds: 0,
nanos: 1000_000,
}),
begin: 0,
end: 10,
state: RecordState::Finished as i32,
labels: vec![
Label {
name: "block".to_string(),
value: "2".to_string(),
},
Label {
name: "record".to_string(),
value: "1".to_string(),
},
],
content_type: "".to_string(),
});
}
block_manager.save_block(block_ref.clone()).await.unwrap();
write_record!(block_ref, 1000, b"0123456789");
block_manager.finish_block(block_ref).await.unwrap();
Arc::new(AsyncRwLock::new(block_manager))
}
}