reductstore 1.19.8

ReductStore is a time series database designed specifically for storing and managing large amounts of blob data.
Documentation
// Copyright 2021-2026 ReductSoftware UG
// Licensed under the Apache License, Version 2.0

use std::collections::HashMap;
use std::sync::Arc;

use axum::body::Body;
use axum::extract::{Path, State};
use axum_extra::headers::HeaderMap;

use reduct_base::batch::sort_headers_by_time;

use crate::api::http::entry::common::err_to_batched_header;
use crate::api::http::HttpError;
use crate::api::http::StateKeeper;
use crate::auth::policy::WriteAccessPolicy;
use crate::storage::entry::validate_remove_records;

// DELETE /:bucket/:entry/batch
pub(super) async fn remove_batched_records(
    State(keeper): State<Arc<StateKeeper>>,
    headers: HeaderMap,
    Path(path): Path<HashMap<String, String>>,
    _: Body,
) -> Result<HeaderMap, HttpError> {
    let bucket_name = path.get("bucket_name").unwrap();
    let components = keeper
        .get_with_permissions(
            &headers,
            WriteAccessPolicy {
                bucket: bucket_name,
            },
        )
        .await?;

    let entry_name = path.get("entry_name").unwrap();
    validate_remove_records(entry_name)?;

    let record_headers: Vec<_> = sort_headers_by_time(&headers)?;

    let err_map = {
        let entry = components
            .storage
            .get_bucket(bucket_name)
            .await?
            .upgrade()?
            .get_entry(entry_name)
            .await?
            .upgrade()?;
        entry
            .remove_records(record_headers.iter().map(|(time, _)| *time).collect())
            .await?
    };

    let mut headers = HeaderMap::new();
    err_map.iter().for_each(|(time, err)| {
        err_to_batched_header(&mut headers, *time, err);
    });

    Ok(headers.into())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::api::http::tests::{empty_body, headers, keeper, path_to_entry_1};
    use reduct_base::error::{ErrorCode, ReductError};
    use reduct_base::not_found;
    use rstest::rstest;

    #[rstest]
    #[tokio::test]
    async fn test_remove_record_bad_timestamp(
        #[future] keeper: Arc<StateKeeper>,
        mut headers: HeaderMap,
        path_to_entry_1: Path<HashMap<String, String>>,
        #[future] empty_body: Body,
    ) {
        let keeper = keeper.await;
        headers.insert("content-length", "0".parse().unwrap());
        headers.insert("x-reduct-time-yyy", "10".parse().unwrap());

        let err = remove_batched_records(
            State(keeper.clone()),
            headers,
            path_to_entry_1,
            empty_body.await,
        )
        .await
        .err()
        .unwrap();

        assert_eq!(
            err,
            HttpError::new(
                ErrorCode::UnprocessableEntity,
                "Invalid header 'x-reduct-time-yyy': must be an unix timestamp in microseconds",
            )
        );
    }

    #[rstest]
    #[tokio::test]
    async fn test_remove_batched_records(
        #[future] keeper: Arc<StateKeeper>,
        mut headers: HeaderMap,
        path_to_entry_1: Path<HashMap<String, String>>,
        #[future] empty_body: Body,
    ) {
        let keeper = keeper.await;
        headers.insert("x-reduct-time-0", "".parse().unwrap());
        headers.insert("x-reduct-time-1", "".parse().unwrap());

        let err_map = remove_batched_records(
            State(keeper.clone()),
            headers,
            path_to_entry_1,
            empty_body.await,
        )
        .await
        .unwrap();

        let components = keeper.get_anonymous().await.unwrap();
        let bucket = components
            .storage
            .get_bucket("bucket-1")
            .await
            .unwrap()
            .upgrade()
            .unwrap();

        let err = bucket
            .get_entry("entry-1")
            .await
            .unwrap()
            .upgrade()
            .unwrap()
            .begin_read(0)
            .await
            .err()
            .unwrap();
        assert_eq!(
            err,
            not_found!("Record 0 not found in entry bucket-1/entry-1")
        );

        assert_eq!(err_map.len(), 1);
        assert_eq!(
            err_map.get("x-reduct-error-1").unwrap(),
            "404,Record 1 not found in entry bucket-1/entry-1"
        );
    }

    #[rstest]
    #[tokio::test]
    async fn test_remove_batched_records_meta_forbidden(
        #[future] keeper: Arc<StateKeeper>,
        mut headers: HeaderMap,
        #[future] empty_body: Body,
    ) {
        let keeper = keeper.await;
        headers.insert("x-reduct-time-0", "".parse().unwrap());
        let path = Path(HashMap::from_iter(vec![
            ("bucket_name".to_string(), "bucket-1".to_string()),
            ("entry_name".to_string(), "entry-1/$meta".to_string()),
        ]));

        let err = remove_batched_records(State(keeper), headers, path, empty_body.await)
            .await
            .err()
            .unwrap();

        assert_eq!(err.status(), ErrorCode::UnprocessableEntity);
        assert_eq!(
            err.message(),
            "Can't delete records from system entry 'entry-1/$meta'; use label update with remove=true"
        );
    }
}