use crate::storage::entry::{Entry, RecordReader};
use crate::storage::proto::record;
use log::debug;
use reduct_base::error::ReductError;
use reduct_base::{internal_server_error, not_found, too_early};
impl Entry {
pub(crate) async fn begin_read(&self, time: u64) -> Result<RecordReader, ReductError> {
debug!(
"Reading record for ts={} in {}/{}",
time, self.bucket_name, self.name
);
let (block_ref, record) = if let Some(block_ref) = {
let bm = self.block_manager.read().await?;
bm.find_cached_block(time)
} {
let block = block_ref.read().await?;
if let Some(record) = block.get_record(time) {
let record = record.clone();
drop(block);
(block_ref, record)
} else {
let mut bm = self.block_manager.write().await?;
let block_ref = bm.find_block(time).await?;
let block = block_ref.read().await?;
let record = block
.get_record(time)
.ok_or_else(|| {
not_found!(
"Record {} not found in block {}/{}/{}",
time,
self.bucket_name,
self.name,
block.block_id(),
)
})?
.clone();
(block_ref.clone(), record)
}
} else {
let mut bm = self.block_manager.write().await?;
let block_ref = bm.find_block(time).await?;
let block = block_ref.read().await?;
let record = block
.get_record(time)
.ok_or_else(|| {
not_found!(
"Record {} not found in block {}/{}/{}",
time,
self.bucket_name,
self.name,
block.block_id(),
)
})?
.clone();
(block_ref.clone(), record)
};
if record.state == record::State::Started as i32 {
return Err(too_early!(
"Record with timestamp {} in {}/{} is still being written",
time,
self.bucket_name,
self.name
));
}
if record.state == record::State::Errored as i32 {
return Err(internal_server_error!(
"Record with timestamp {} in {}/{} is broken",
time,
self.bucket_name,
self.name
));
}
RecordReader::try_new(self.block_manager.clone(), block_ref, time, None, None).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::file_cache::FILE_CACHE;
use crate::storage::block_manager::DATA_FILE_EXT;
use crate::storage::engine::MAX_IO_BUFFER_SIZE;
use crate::storage::entry::tests::{
entry, entry_settings, path, write_record, write_stub_record,
};
use crate::storage::entry::EntrySettings;
use bytes::Bytes;
use reduct_base::io::ReadRecord;
use reduct_base::Labels;
use rstest::rstest;
use std::path::PathBuf;
use std::sync::Arc;
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_empty(#[future] entry: Arc<Entry>) {
let entry = entry.await;
let writer = entry.begin_read(1000).await;
assert_eq!(
writer.err(),
Some(not_found!("Record 1000 not found in entry bucket/entry"))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_early(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
let writer = entry.begin_read(1000).await;
assert_eq!(
writer.err(),
Some(not_found!("Record 1000 not found in entry bucket/entry"))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_late(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
let reader = entry.begin_read(2000000).await;
assert_eq!(
reader.err(),
Some(not_found!(
"Record 2000000 not found in block bucket/entry/1000000"
))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_broken(#[future] entry: Arc<Entry>) {
let entry = entry.await;
let mut sender = entry
.clone()
.begin_write(1000000, 10, "text/plain".to_string(), Labels::new())
.await
.unwrap();
sender
.send(Ok(Some(Bytes::from(vec![0; 50]))))
.await
.unwrap();
sender.send(Ok(None)).await.unwrap();
let reader = entry.begin_read(1000000).await;
assert_eq!(
reader.err(),
Some(internal_server_error!(
"Record with timestamp 1000000 in bucket/entry is broken"
))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_still_written(#[future] entry: Arc<Entry>) {
let entry = entry.await;
let mut sender = entry
.clone()
.begin_write(1000000, 10, "text/plain".to_string(), Labels::new())
.await
.unwrap();
sender
.send(Ok(Some(Bytes::from(vec![0; 5]))))
.await
.unwrap();
let reader = entry.begin_read(1000000).await;
assert_eq!(
reader.err(),
Some(too_early!(
"Record with timestamp 1000000 in bucket/entry is still being written"
))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_not_found(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
write_stub_record(&entry, 3000000).await;
let reader = entry.begin_read(2000000).await;
assert_eq!(
reader.err(),
Some(not_found!(
"Record 2000000 not found in block bucket/entry/1000000"
))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_not_found_in_cached_block(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
write_stub_record(&entry, 3000000).await;
{
let mut bm = entry.block_manager.write().await.unwrap();
let _ = bm.find_block(1000000).await.unwrap();
}
let reader = entry.begin_read(2000000).await;
assert_eq!(
reader.err(),
Some(not_found!(
"Record 2000000 not found in block bucket/entry/1000000"
))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_missing_data_file(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
let data_path = {
let bm = entry.block_manager.read().await.unwrap();
let block_id = *bm.index().tree().first().unwrap();
bm.path().join(format!("{}{}", block_id, DATA_FILE_EXT))
};
FILE_CACHE.remove(&data_path).await.unwrap();
let reader = entry.begin_read(1000000).await;
assert_eq!(
reader.err(),
Some(not_found!("Data block {} not found", data_path.display()))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_ok1(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
let mut reader = entry.begin_read(1000000).await.unwrap();
assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_ok2(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
write_stub_record(&entry, 1010000).await;
let mut reader = entry.begin_read(1010000).await.unwrap();
assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_uses_cached_block(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1000000).await;
{
let mut bm = entry.block_manager.write().await.unwrap();
let _ = bm.find_block(1000000).await.unwrap();
}
let mut reader = entry.begin_read(1000000).await.unwrap();
assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_ok_in_chunks(#[future] entry: Arc<Entry>) {
let entry = entry.await;
let mut data = vec![0; MAX_IO_BUFFER_SIZE + 1];
data[0] = 1;
data[MAX_IO_BUFFER_SIZE] = 2;
write_record(&entry, 1000000, data.clone()).await;
let mut reader = entry.begin_read(1000000).await.unwrap();
assert_eq!(
reader.read_chunk().unwrap().unwrap().to_vec(),
data[0..MAX_IO_BUFFER_SIZE]
);
assert_eq!(
reader.read_chunk().unwrap().unwrap().to_vec(),
data[MAX_IO_BUFFER_SIZE..]
);
assert_eq!(reader.read_chunk(), None);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_search(path: PathBuf) {
let entry = entry(
EntrySettings {
max_block_size: 10000,
max_block_records: 5,
},
path,
)
.await;
let step = 100000;
for i in 0..10 {
write_stub_record(&entry, i * step).await;
}
let reader = entry.begin_read(5 * step).await.unwrap();
assert_eq!(reader.meta().timestamp(), 500000);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read_when_entry_is_deleted(entry_settings: EntrySettings, path: PathBuf) {
let entry = entry(entry_settings.clone(), path.clone()).await;
entry.mark_deleting().await.unwrap();
let writer = entry.begin_read(1000).await;
assert_eq!(
writer.err(),
Some(not_found!("Record 1000 not found in entry bucket/entry"))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_begin_read(entry_settings: EntrySettings, path: PathBuf) {
let entry = entry(entry_settings.clone(), path.clone()).await;
write_stub_record(&entry, 1000000).await;
let mut reader = entry.begin_read(1000000).await.unwrap();
assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
}
}