reductstore 1.19.8

ReductStore is a time series database designed specifically for storing and managing large amounts of blob data.
Documentation
// Copyright 2021-2026 ReductSoftware UG
// Licensed under the Apache License, Version 2.0

use crate::storage::entry::{Entry, RecordReader};
use crate::storage::proto::record;
use log::debug;
use reduct_base::error::ReductError;
use reduct_base::{internal_server_error, not_found, too_early};

impl Entry {
    /// Starts a new record read.
    ///
    /// # Arguments
    ///
    /// * `time` - The timestamp of the record.
    ///
    /// # Returns
    ///
    /// * `RecordReader` - The record reader to read the record content in chunks.
    /// * `HTTPError` - The error if any.
    pub(crate) async fn begin_read(&self, time: u64) -> Result<RecordReader, ReductError> {
        debug!(
            "Reading record for ts={} in {}/{}",
            time, self.bucket_name, self.name
        );

        let (block_ref, record) = if let Some(block_ref) = {
            let bm = self.block_manager.read().await?;
            bm.find_cached_block(time)
        } {
            let block = block_ref.read().await?;
            if let Some(record) = block.get_record(time) {
                let record = record.clone();
                drop(block);
                (block_ref, record)
            } else {
                let mut bm = self.block_manager.write().await?;
                let block_ref = bm.find_block(time).await?;
                let block = block_ref.read().await?;
                let record = block
                    .get_record(time)
                    .ok_or_else(|| {
                        not_found!(
                            "Record {} not found in block {}/{}/{}",
                            time,
                            self.bucket_name,
                            self.name,
                            block.block_id(),
                        )
                    })?
                    .clone();
                (block_ref.clone(), record)
            }
        } else {
            let mut bm = self.block_manager.write().await?;
            let block_ref = bm.find_block(time).await?;
            let block = block_ref.read().await?;
            let record = block
                .get_record(time)
                .ok_or_else(|| {
                    not_found!(
                        "Record {} not found in block {}/{}/{}",
                        time,
                        self.bucket_name,
                        self.name,
                        block.block_id(),
                    )
                })?
                .clone();
            (block_ref.clone(), record)
        };

        if record.state == record::State::Started as i32 {
            return Err(too_early!(
                "Record with timestamp {} in {}/{} is still being written",
                time,
                self.bucket_name,
                self.name
            ));
        }

        if record.state == record::State::Errored as i32 {
            return Err(internal_server_error!(
                "Record with timestamp {} in {}/{} is broken",
                time,
                self.bucket_name,
                self.name
            ));
        }

        RecordReader::try_new(self.block_manager.clone(), block_ref, time, None, None).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::file_cache::FILE_CACHE;
    use crate::storage::block_manager::DATA_FILE_EXT;
    use crate::storage::engine::MAX_IO_BUFFER_SIZE;
    use crate::storage::entry::tests::{
        entry, entry_settings, path, write_record, write_stub_record,
    };
    use crate::storage::entry::EntrySettings;
    use bytes::Bytes;
    use reduct_base::io::ReadRecord;
    use reduct_base::Labels;
    use rstest::rstest;
    use std::path::PathBuf;
    use std::sync::Arc;

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_empty(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        let writer = entry.begin_read(1000).await;
        assert_eq!(
            writer.err(),
            Some(not_found!("Record 1000 not found in entry bucket/entry"))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_early(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;
        let writer = entry.begin_read(1000).await;
        assert_eq!(
            writer.err(),
            Some(not_found!("Record 1000 not found in entry bucket/entry"))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_late(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;
        let reader = entry.begin_read(2000000).await;
        assert_eq!(
            reader.err(),
            Some(not_found!(
                "Record 2000000 not found in block bucket/entry/1000000"
            ))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_broken(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        let mut sender = entry
            .clone()
            .begin_write(1000000, 10, "text/plain".to_string(), Labels::new())
            .await
            .unwrap();
        sender
            .send(Ok(Some(Bytes::from(vec![0; 50]))))
            .await
            .unwrap();
        sender.send(Ok(None)).await.unwrap();

        let reader = entry.begin_read(1000000).await;
        assert_eq!(
            reader.err(),
            Some(internal_server_error!(
                "Record with timestamp 1000000 in bucket/entry is broken"
            ))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_still_written(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        let mut sender = entry
            .clone()
            .begin_write(1000000, 10, "text/plain".to_string(), Labels::new())
            .await
            .unwrap();
        sender
            .send(Ok(Some(Bytes::from(vec![0; 5]))))
            .await
            .unwrap();

        let reader = entry.begin_read(1000000).await;
        assert_eq!(
            reader.err(),
            Some(too_early!(
                "Record with timestamp 1000000 in bucket/entry is still being written"
            ))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_not_found(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;
        write_stub_record(&entry, 3000000).await;

        let reader = entry.begin_read(2000000).await;
        assert_eq!(
            reader.err(),
            Some(not_found!(
                "Record 2000000 not found in block bucket/entry/1000000"
            ))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_not_found_in_cached_block(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;
        write_stub_record(&entry, 3000000).await;

        {
            let mut bm = entry.block_manager.write().await.unwrap();
            let _ = bm.find_block(1000000).await.unwrap();
        }
        let reader = entry.begin_read(2000000).await;
        assert_eq!(
            reader.err(),
            Some(not_found!(
                "Record 2000000 not found in block bucket/entry/1000000"
            ))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_missing_data_file(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;

        let data_path = {
            let bm = entry.block_manager.read().await.unwrap();
            let block_id = *bm.index().tree().first().unwrap();
            bm.path().join(format!("{}{}", block_id, DATA_FILE_EXT))
        };
        FILE_CACHE.remove(&data_path).await.unwrap();

        let reader = entry.begin_read(1000000).await;
        assert_eq!(
            reader.err(),
            Some(not_found!("Data block {} not found", data_path.display()))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_ok1(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;
        let mut reader = entry.begin_read(1000000).await.unwrap();
        assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_ok2(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;
        write_stub_record(&entry, 1010000).await;

        let mut reader = entry.begin_read(1010000).await.unwrap();
        assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_uses_cached_block(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        write_stub_record(&entry, 1000000).await;

        {
            let mut bm = entry.block_manager.write().await.unwrap();
            let _ = bm.find_block(1000000).await.unwrap();
        }

        let mut reader = entry.begin_read(1000000).await.unwrap();
        assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_ok_in_chunks(#[future] entry: Arc<Entry>) {
        let entry = entry.await;
        let mut data = vec![0; MAX_IO_BUFFER_SIZE + 1];
        data[0] = 1;
        data[MAX_IO_BUFFER_SIZE] = 2;

        write_record(&entry, 1000000, data.clone()).await;

        let mut reader = entry.begin_read(1000000).await.unwrap();
        assert_eq!(
            reader.read_chunk().unwrap().unwrap().to_vec(),
            data[0..MAX_IO_BUFFER_SIZE]
        );
        assert_eq!(
            reader.read_chunk().unwrap().unwrap().to_vec(),
            data[MAX_IO_BUFFER_SIZE..]
        );
        assert_eq!(reader.read_chunk(), None);
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_search(path: PathBuf) {
        let entry = entry(
            EntrySettings {
                max_block_size: 10000,
                max_block_records: 5,
            },
            path,
        )
        .await;

        let step = 100000;
        for i in 0..10 {
            write_stub_record(&entry, i * step).await;
        }

        let reader = entry.begin_read(5 * step).await.unwrap();
        assert_eq!(reader.meta().timestamp(), 500000);
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read_when_entry_is_deleted(entry_settings: EntrySettings, path: PathBuf) {
        let entry = entry(entry_settings.clone(), path.clone()).await;
        entry.mark_deleting().await.unwrap();

        let writer = entry.begin_read(1000).await;
        assert_eq!(
            writer.err(),
            Some(not_found!("Record 1000 not found in entry bucket/entry"))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_begin_read(entry_settings: EntrySettings, path: PathBuf) {
        let entry = entry(entry_settings.clone(), path.clone()).await;

        write_stub_record(&entry, 1000000).await;
        let mut reader = entry.begin_read(1000000).await.unwrap();
        assert_eq!(reader.read_chunk().unwrap(), Ok(Bytes::from("0123456789")));
    }
}