reductstore 1.19.0

ReductStore is a time series database designed specifically for storing and managing large amounts of blob data.
Documentation
// Copyright 2023-2026 ReductSoftware UG
// Licensed under the Business Source License 1.1

use crate::storage::bucket::Bucket;
use crate::storage::entry::Entry;
use log::debug;
use reduct_base::error::ReductError;
use reduct_base::msg::bucket_api::QuotaType;
use reduct_base::{bad_request, internal_server_error};
use std::sync::Arc;

impl Bucket {
    pub(super) async fn keep_quota_for(
        self: &Arc<Self>,
        content_size: u64,
    ) -> Result<(), ReductError> {
        let settings = self.settings.read().await?;
        let quota_size = settings.quota_size.unwrap_or(0);
        match settings.quota_type.clone().unwrap_or(QuotaType::NONE) {
            QuotaType::NONE => Ok(()),
            QuotaType::FIFO => self.remove_oldest_block(content_size, quota_size).await,
            QuotaType::HARD => {
                let entries = self.entries.read().await?;
                let entry_list: Vec<_> = entries.values().cloned().collect();
                drop(entries);

                let mut total_size = 0u64;
                for entry in entry_list {
                    total_size += entry.size().await?;
                }
                if total_size + content_size as u64 > quota_size {
                    Err(bad_request!("Quota of '{}' exceeded", self.name()))
                } else {
                    Ok(())
                }
            }
        }
    }

    async fn remove_oldest_block(
        &self,
        content_size: u64,
        quota_size: u64,
    ) -> Result<(), ReductError> {
        let get_bucket_size = async || {
            let entries = self.entries.read().await?;

            let mut total_size = 0u64;
            for entry in entries.values() {
                total_size += entry.size().await?;
            }
            Ok::<u64, ReductError>(total_size)
        };

        let mut size = get_bucket_size().await? + content_size as u64;
        while size > quota_size {
            let mut success = false;

            {
                debug!(
                    "Need more space. Remove an oldest block from bucket '{}'",
                    self.name()
                );

                let mut candidates: Vec<(u64, &Entry)> = vec![];
                let entries = self.entries.read().await?;
                for (_, entry) in entries.iter() {
                    if !entry.is_eligible_for_fifo_eviction() {
                        continue;
                    }
                    let info = entry.info().await?;
                    candidates.push((info.oldest_record, entry));
                }
                candidates.sort_by_key(|entry| entry.0);

                let candidates = candidates
                    .iter()
                    .map(|(_, entry)| entry.name().to_string())
                    .collect::<Vec<String>>();

                for name in candidates {
                    debug!("Remove an oldest block from entry '{}'", name);
                    match entries.get(&name).unwrap().try_remove_oldest_block().await {
                        Ok(_) => {
                            success = true;
                            break;
                        }
                        Err(e) => {
                            debug!("Failed to remove oldest block from entry '{}': {}", name, e);
                        }
                    }
                }
            }

            if !success {
                return Err(internal_server_error!(
                    "Failed to keep quota of '{}'",
                    self.name()
                ));
            }

            size = get_bucket_size().await? + content_size;
        }

        // Remove empty entries
        let mut entries = self.entries.write().await?;
        let mut names_to_remove = vec![];
        for (name, entry) in entries.iter() {
            if entry.info().await?.record_count != 0 {
                continue;
            }
            names_to_remove.push(name.clone());
        }

        for name in names_to_remove {
            entries.remove(&name);
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use crate::storage::bucket::tests::{bucket, path, write, write_meta};
    use reduct_base::error::ReductError;
    use reduct_base::msg::bucket_api::{BucketSettings, QuotaType};
    use rstest::rstest;
    use std::path::PathBuf;

    #[rstest]
    #[tokio::test]
    async fn test_fifo_quota_keeping(path: PathBuf) {
        let bucket = bucket(
            BucketSettings {
                max_block_size: Some(20),
                quota_type: Some(QuotaType::FIFO),
                quota_size: Some(120),
                max_block_records: Some(100),
            },
            path,
        )
        .await;

        let blob: &[u8] = &[0u8; 40];

        write(&bucket, "test-1", 0, blob).await.unwrap();
        assert_eq!(bucket.clone().info().await.unwrap().info.size, 44);

        write(&bucket, "test-2", 1, blob).await.unwrap();
        assert_eq!(bucket.clone().info().await.unwrap().info.size, 91);

        write(&bucket, "test-3", 2, blob).await.unwrap();
        assert_eq!(bucket.clone().info().await.unwrap().info.size, 94);

        assert_eq!(
            crate::storage::bucket::tests::read(&bucket, "test-1", 0)
                .await
                .err(),
            Some(ReductError::not_found(
                "Entry 'test-1' not found in bucket 'test'"
            ))
        );
    }

    #[rstest]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_fifo_quota_ignores_meta_entries_for_eviction(path: PathBuf) {
        let bucket = bucket(
            BucketSettings {
                max_block_size: Some(20),
                quota_type: Some(QuotaType::FIFO),
                quota_size: Some(120),
                max_block_records: Some(100),
            },
            path,
        )
        .await;

        let blob: &[u8] = &[0u8; 40];
        write_meta(&bucket, "data-1/$meta", 0, blob).await.unwrap();
        write(&bucket, "data-1", 1, blob).await.unwrap();
        write(&bucket, "data-2", 2, blob).await.unwrap();

        assert!(crate::storage::bucket::tests::read(&bucket, "data-1", 1)
            .await
            .is_err());
        assert!(
            crate::storage::bucket::tests::read(&bucket, "data-1/$meta", 0)
                .await
                .is_ok()
        );
        assert!(crate::storage::bucket::tests::read(&bucket, "data-2", 2)
            .await
            .is_ok());
    }

    #[rstest]
    #[tokio::test]
    async fn test_hard_quota_keeping(path: PathBuf) {
        let bucket = bucket(
            BucketSettings {
                quota_type: Some(QuotaType::HARD),
                quota_size: Some(100),
                ..BucketSettings::default()
            },
            path,
        )
        .await;

        let blob: &[u8] = &[0u8; 40];
        write(&bucket, "test-1", 0, blob).await.unwrap();
        write(&bucket, "test-2", 1, blob).await.unwrap();

        let err = write(&bucket, "test-3", 2, blob).await.err().unwrap();
        assert_eq!(err, ReductError::bad_request("Quota of 'test' exceeded"));
    }

    #[rstest]
    #[tokio::test]
    async fn test_blob_bigger_than_quota(path: PathBuf) {
        let bucket = bucket(
            BucketSettings {
                max_block_size: Some(5),
                quota_type: Some(QuotaType::FIFO),
                quota_size: Some(10),
                max_block_records: Some(100),
            },
            path,
        )
        .await;

        write(&bucket, "test-1", 0, b"test").await.unwrap();
        bucket.sync_fs().await.unwrap(); // we need to sync to get the correct size
        assert_eq!(bucket.clone().info().await.unwrap().info.size, 22);

        let result = write(&bucket, "test-2", 1, b"0123456789___").await;
        assert_eq!(
            result.err(),
            Some(ReductError::internal_server_error(
                "Failed to keep quota of 'test'"
            ))
        );
    }
}