reductstore 1.19.0

ReductStore is a time series database designed specifically for storing and managing large amounts of blob data.
Documentation
// Copyright 2025 ReductSoftware UG
// Licensed under the Business Source License 1.1

use crate::cfg::{CfgParser, ExtCfgBounds};
use crate::core::env::{Env, GetEnv};
use crate::storage::engine::StorageEngine;
use bytesize::ByteSize;
use log::{error, info};
use reduct_base::error::ErrorCode;
use reduct_base::msg::bucket_api::BucketSettings;
use std::collections::HashMap;
use std::path::PathBuf;

impl<EnvGetter: GetEnv, ExtCfg: ExtCfgBounds> CfgParser<EnvGetter, ExtCfg> {
    pub(in crate::cfg) async fn provision_buckets(&self, data_path: &PathBuf) -> StorageEngine {
        let builder = StorageEngine::builder()
            .with_cfg(self.cfg.clone())
            .with_data_path(data_path.clone());

        let storage = if let Some(license) = self.license.clone() {
            builder.with_license(license).build().await
        } else {
            builder.build().await
        };

        for (name, settings) in &self.cfg.buckets {
            let settings = match storage.create_bucket(&name, settings.clone()).await {
                Ok(bucket) => {
                    let bucket = bucket.upgrade().unwrap();
                    bucket.set_provisioned(true);
                    Ok(bucket.settings().await)
                }
                Err(e) => {
                    if e.status() == ErrorCode::Conflict {
                        let bucket = storage.get_bucket(&name).await.unwrap().upgrade().unwrap();
                        bucket.set_provisioned(false);
                        bucket.set_settings(settings.clone()).await.unwrap();
                        bucket.set_provisioned(true);

                        Ok(bucket.settings().await)
                    } else {
                        Err(e)
                    }
                }
            };

            if let Ok(settings) = settings {
                info!("Provisioned bucket '{}' with: {:?}", name, settings);
            } else {
                error!(
                    "Failed to provision bucket '{}': {}",
                    name,
                    settings.err().unwrap()
                );
            }
        }
        storage
    }

    pub(in crate::cfg) fn parse_buckets(
        env: &mut Env<EnvGetter>,
    ) -> HashMap<String, BucketSettings> {
        let mut buckets = HashMap::<String, (String, BucketSettings)>::new();
        for (id, name) in env.matches("RS_BUCKET_(.*)_NAME") {
            buckets.insert(id, (name, BucketSettings::default()));
        }

        for (id, bucket) in &mut buckets {
            let settings = &mut bucket.1;
            settings.quota_type = env.get_optional(&format!("RS_BUCKET_{}_QUOTA_TYPE", id));

            settings.quota_size = env
                .get_optional::<ByteSize>(&format!("RS_BUCKET_{}_QUOTA_SIZE", id))
                .map(|s| s.as_u64());
            settings.max_block_size = env
                .get_optional::<ByteSize>(&format!("RS_BUCKET_{}_MAX_BLOCK_SIZE", id))
                .map(|s| s.as_u64());
            settings.max_block_records =
                env.get_optional(&format!("RS_BUCKET_{}_MAX_BLOCK_RECORDS", id));
        }

        buckets
            .into_iter()
            .map(|(_id, (name, settings))| (name, settings))
            .collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::cfg::tests::MockEnvGetter;
    use crate::storage::bucket::Bucket;
    use mockall::predicate::eq;
    use reduct_base::error::ReductError;
    use reduct_base::msg::bucket_api::QuotaType::FIFO;
    use reduct_base::not_found;
    use rstest::{fixture, rstest};
    use std::collections::BTreeMap;
    use std::env::VarError;
    use test_log::test as log_test;

    #[log_test(rstest)]
    #[tokio::test]
    async fn test_buckets(mut env_with_buckets: MockEnvGetter) {
        env_with_buckets
            .expect_get()
            .with(eq("RS_BUCKET_1_NAME"))
            .return_const(Ok("bucket1".to_string()));
        env_with_buckets
            .expect_get()
            .with(eq("RS_BUCKET_1_QUOTA_TYPE"))
            .return_const(Ok("FIFO".to_string()));
        env_with_buckets
            .expect_get()
            .with(eq("RS_BUCKET_1_QUOTA_SIZE"))
            .return_const(Ok("1GB".to_string()));
        env_with_buckets
            .expect_get()
            .with(eq("RS_BUCKET_1_MAX_BLOCK_SIZE"))
            .return_const(Ok("1MB".to_string()));
        env_with_buckets
            .expect_get()
            .with(eq("RS_BUCKET_1_MAX_BLOCK_RECORDS"))
            .return_const(Ok("1000".to_string()));

        env_with_buckets
            .expect_get()
            .return_const(Err(VarError::NotPresent));

        let cfg = CfgParser::from_env(env_with_buckets, "0.0.0").await;
        let components = cfg.build().await.unwrap();

        let bucket1 = components
            .storage
            .get_bucket("bucket1")
            .await
            .unwrap()
            .upgrade_and_unwrap();

        assert!(bucket1.is_provisioned());
        let settings = bucket1.settings().await.unwrap();
        assert_eq!(settings.quota_type, Some(FIFO));
        assert_eq!(settings.quota_size, Some(1_000_000_000));
        assert_eq!(settings.max_block_size, Some(1_000_000));
        assert_eq!(settings.max_block_records, Some(1000));
    }

    #[log_test(rstest)]
    #[tokio::test]
    async fn test_buckets_defaults(mut env_with_buckets: MockEnvGetter) {
        env_with_buckets
            .expect_get()
            .with(eq("RS_BUCKET_1_NAME"))
            .return_const(Ok("bucket1".to_string()));

        env_with_buckets
            .expect_get()
            .return_const(Err(VarError::NotPresent));

        let cfg = CfgParser::from_env(env_with_buckets, "0.0.0").await;
        let components = cfg.build().await.unwrap();
        let bucket1 = components
            .storage
            .get_bucket("bucket1")
            .await
            .unwrap()
            .upgrade_and_unwrap();

        assert_eq!(
            bucket1.settings().await.unwrap(),
            Bucket::defaults(),
            "use defaults if env vars are not set"
        );
    }

    #[log_test(rstest)]
    #[tokio::test]
    async fn test_buckets_bad_name(mut env_with_buckets: MockEnvGetter) {
        env_with_buckets
            .expect_get()
            .with(eq("RS_BUCKET_1_NAME"))
            .return_const(Ok("$$$$$".to_string()));

        env_with_buckets
            .expect_get()
            .return_const(Err(VarError::NotPresent));

        let cfg = CfgParser::from_env(env_with_buckets, "0.0.0").await;
        let components = cfg.build().await.unwrap();

        assert_eq!(
            components.storage.get_bucket("$$$$$").await.err().unwrap(),
            not_found!("Bucket '$$$$$' is not found")
        );
    }

    #[fixture]
    fn env_with_buckets() -> MockEnvGetter {
        let tmp = tempfile::tempdir().unwrap();
        let mut mock_getter = MockEnvGetter::new();
        mock_getter
            .expect_get()
            .with(eq("RS_DATA_PATH"))
            .return_const(Ok(tmp.keep().to_str().unwrap().to_string()));
        mock_getter.expect_all().returning(|| {
            let mut map = BTreeMap::new();
            map.insert("RS_BUCKET_1_NAME".to_string(), "bucket1".to_string());
            map
        });

        mock_getter
    }
}