use crate::cfg::io::IoConfig;
use crate::core::sync::AsyncRwLock;
use crate::storage::block_manager::BlockManager;
use crate::storage::entry::RecordReader;
use crate::storage::query::base::{Query, QueryOptions};
use crate::storage::query::historical::HistoricalQuery;
use async_trait::async_trait;
use reduct_base::error::{ErrorCode, ReductError};
use reduct_base::io::ReadRecord;
use std::sync::Arc;
pub struct ContinuousQuery {
entry_name: String,
query: HistoricalQuery,
next_start: u64,
count: usize,
options: QueryOptions,
io_defaults: IoConfig,
}
impl ContinuousQuery {
pub fn try_new(
entry_name: String,
start: u64,
options: QueryOptions,
io_defaults: IoConfig,
) -> Result<Self, ReductError> {
if !options.continuous {
panic!("Continuous query must be continuous");
}
Ok(ContinuousQuery {
entry_name: entry_name.clone(),
query: HistoricalQuery::try_new(
entry_name,
start,
u64::MAX,
options.clone(),
io_defaults.clone(),
)?,
next_start: start,
count: 0,
options,
io_defaults,
})
}
}
#[async_trait]
impl Query for ContinuousQuery {
async fn next(
&mut self,
block_manager: Arc<AsyncRwLock<BlockManager>>,
) -> Result<RecordReader, ReductError> {
match self.query.next(block_manager).await {
Ok(reader) => {
self.next_start = reader.meta().timestamp() + 1;
self.count += 1;
Ok(reader)
}
Err(ReductError {
status: ErrorCode::NoContent,
..
}) => {
self.query = HistoricalQuery::try_new(
self.entry_name.clone(),
self.next_start,
u64::MAX,
self.options.clone(),
self.io_defaults.clone(),
)?;
Err(ReductError {
status: ErrorCode::NoContent,
message: "No content".to_string(),
})
}
Err(err) => Err(err),
}
}
fn io_settings(&self) -> &IoConfig {
&self.query.io_settings()
}
}
#[cfg(test)]
mod tests {
use super::*;
use reduct_base::error::ErrorCode;
use rstest::rstest;
use crate::storage::query::base::tests::block_manager;
#[rstest]
#[tokio::test]
async fn test_query(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
let block_manager = block_manager.await;
let mut query = ContinuousQuery::try_new(
"entry".to_string(),
900,
QueryOptions {
ttl: std::time::Duration::from_millis(100),
continuous: true,
..QueryOptions::default()
},
IoConfig::default(),
)
.unwrap();
{
let reader = query.next(block_manager.clone()).await.unwrap();
assert_eq!(reader.meta().timestamp(), 1000);
}
assert_eq!(
query.next(block_manager.clone()).await.err(),
Some(ReductError {
status: ErrorCode::NoContent,
message: "No content".to_string(),
})
);
assert_eq!(
query.next(block_manager).await.err(),
Some(ReductError {
status: ErrorCode::NoContent,
message: "No content".to_string(),
})
);
}
}