reductstore 1.19.8

ReductStore is a time series database designed specifically for storing and managing large amounts of blob data.
Documentation
// Copyright 2021-2026 ReductSoftware UG
// Licensed under the Apache License, Version 2.0

use crate::cfg::io::IoConfig;
use crate::core::sync::AsyncRwLock;
use crate::storage::block_manager::BlockManager;
use crate::storage::entry::RecordReader;
use crate::storage::query::base::{Query, QueryOptions};
use crate::storage::query::historical::HistoricalQuery;
use async_trait::async_trait;
use reduct_base::error::{ErrorCode, ReductError};
use reduct_base::io::ReadRecord;
use std::sync::Arc;

pub struct ContinuousQuery {
    entry_name: String,
    query: HistoricalQuery,
    next_start: u64,
    count: usize,
    options: QueryOptions,
    io_defaults: IoConfig,
}

impl ContinuousQuery {
    pub fn try_new(
        entry_name: String,
        start: u64,
        options: QueryOptions,
        io_defaults: IoConfig,
    ) -> Result<Self, ReductError> {
        if !options.continuous {
            panic!("Continuous query must be continuous");
        }

        Ok(ContinuousQuery {
            entry_name: entry_name.clone(),
            query: HistoricalQuery::try_new(
                entry_name,
                start,
                u64::MAX,
                options.clone(),
                io_defaults.clone(),
            )?,
            next_start: start,
            count: 0,
            options,
            io_defaults,
        })
    }
}

#[async_trait]
impl Query for ContinuousQuery {
    async fn next(
        &mut self,
        block_manager: Arc<AsyncRwLock<BlockManager>>,
    ) -> Result<RecordReader, ReductError> {
        match self.query.next(block_manager).await {
            Ok(reader) => {
                self.next_start = reader.meta().timestamp() + 1;
                self.count += 1;
                Ok(reader)
            }
            Err(ReductError {
                status: ErrorCode::NoContent,
                ..
            }) => {
                self.query = HistoricalQuery::try_new(
                    self.entry_name.clone(),
                    self.next_start,
                    u64::MAX,
                    self.options.clone(),
                    self.io_defaults.clone(),
                )?;
                Err(ReductError {
                    status: ErrorCode::NoContent,
                    message: "No content".to_string(),
                })
            }
            Err(err) => Err(err),
        }
    }

    fn io_settings(&self) -> &IoConfig {
        &self.query.io_settings()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use reduct_base::error::ErrorCode;
    use rstest::rstest;

    use crate::storage::query::base::tests::block_manager;

    #[rstest]
    #[tokio::test]
    async fn test_query(#[future] block_manager: Arc<AsyncRwLock<BlockManager>>) {
        let block_manager = block_manager.await;
        let mut query = ContinuousQuery::try_new(
            "entry".to_string(),
            900,
            QueryOptions {
                ttl: std::time::Duration::from_millis(100),
                continuous: true,
                ..QueryOptions::default()
            },
            IoConfig::default(),
        )
        .unwrap();
        {
            let reader = query.next(block_manager.clone()).await.unwrap();
            assert_eq!(reader.meta().timestamp(), 1000);
        }
        assert_eq!(
            query.next(block_manager.clone()).await.err(),
            Some(ReductError {
                status: ErrorCode::NoContent,
                message: "No content".to_string(),
            })
        );
        assert_eq!(
            query.next(block_manager).await.err(),
            Some(ReductError {
                status: ErrorCode::NoContent,
                message: "No content".to_string(),
            })
        );
    }
}