reductstore 1.19.8

ReductStore is a time series database designed specifically for storing and managing large amounts of blob data.
Documentation
// Copyright 2021-2026 ReductSoftware UG
// Licensed under the Apache License, Version 2.0

use crate::api::http::replication::ReplicationSettingsAxum;
use crate::api::http::{HttpError, StateKeeper};
use crate::auth::policy::FullAccessPolicy;
use axum::extract::{Path, State};
use axum_extra::headers::HeaderMap;
use std::sync::Arc;

// PUT /api/v1/replications/:replication_name
pub(super) async fn update_replication(
    State(keeper): State<Arc<StateKeeper>>,
    Path(replication_name): Path<String>,
    headers: HeaderMap,
    settings: ReplicationSettingsAxum,
) -> Result<(), HttpError> {
    let components = keeper
        .get_with_permissions(&headers, FullAccessPolicy {})
        .await?;
    components
        .replication_repo
        .write()
        .await?
        .update_replication(&replication_name, settings.into())
        .await?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::api::http::replication::tests::settings;
    use crate::api::http::tests::{headers, keeper};
    use reduct_base::msg::replication_api::ReplicationSettings;
    use rstest::{fixture, rstest};
    use std::sync::Arc;

    #[rstest]
    #[tokio::test]
    async fn test_update_replication_ok(
        #[future] keeper_with_bucket: Arc<StateKeeper>,
        headers: HeaderMap,
        mut settings: ReplicationSettings,
    ) {
        let keeper = keeper_with_bucket.await;
        let components = keeper.get_anonymous().await.unwrap();
        settings.dst_bucket = "bucket-3".to_string();

        update_replication(
            State(Arc::clone(&keeper)),
            Path("test".to_string()),
            headers,
            ReplicationSettingsAxum::from(settings),
        )
        .await
        .unwrap();

        assert_eq!(
            components
                .replication_repo
                .read()
                .await
                .unwrap()
                .get_replication_settings("test")
                .await
                .unwrap()
                .dst_bucket,
            "bucket-3"
        );
    }

    #[rstest]
    #[tokio::test]
    async fn test_update_replication_error(
        #[future] keeper_with_bucket: Arc<StateKeeper>,
        headers: HeaderMap,
        mut settings: ReplicationSettings,
    ) {
        settings.dst_host = "BROKEN URL".to_string();

        let result = update_replication(
            State(keeper_with_bucket.await),
            Path("test".to_string()),
            headers,
            ReplicationSettingsAxum::from(settings),
        )
        .await;

        assert!(result.is_err());
    }

    #[fixture]
    async fn keeper_with_bucket(
        #[future] keeper: Arc<StateKeeper>,
        settings: ReplicationSettings,
    ) -> Arc<StateKeeper> {
        let keeper = keeper.await;
        let components = keeper.get_anonymous().await.unwrap();
        components
            .replication_repo
            .write()
            .await
            .unwrap()
            .create_replication("test", settings)
            .await
            .unwrap();
        keeper
    }
}