reductstore 1.19.8

ReductStore is a time series database designed specifically for storing and managing large amounts of blob data.
Documentation
// Copyright 2021-2026 ReductSoftware UG
// Licensed under the Apache License, Version 2.0
mod create;
mod get;
mod list;
mod remove;
mod set_mode;
mod update;

use crate::api::http::{HttpError, StateKeeper};
use axum_extra::headers::HeaderMapExt;

use crate::api::http::replication::create::create_replication;
use crate::api::http::replication::get::get_replication;
use crate::api::http::replication::list::list_replications;
use crate::api::http::replication::remove::remove_replication;
use crate::api::http::replication::set_mode::set_mode;
use crate::api::http::replication::update::update_replication;
use axum::body::Body;
use axum::extract::FromRequest;
use axum::http::Request;
use axum::routing::{delete, get, patch, post, put};
use bytes::Bytes;
use reduct_base::msg::replication_api::{
    FullReplicationInfo, ReplicationList, ReplicationModePayload, ReplicationSettings,
};
use reduct_macros::{IntoResponse, Twin};
use std::sync::Arc;

#[derive(IntoResponse, Twin, Debug)]
pub struct ReplicationSettingsAxum(ReplicationSettings);

#[derive(IntoResponse, Twin, Debug)]
pub struct ReplicationModePayloadAxum(ReplicationModePayload);

impl<S> FromRequest<S> for ReplicationModePayloadAxum
where
    Bytes: FromRequest<S>,
    S: Send + Sync,
{
    type Rejection = HttpError;

    async fn from_request(req: Request<Body>, state: &S) -> Result<Self, Self::Rejection> {
        let bytes = Bytes::from_request(req, state).await.map_err(|_| {
            HttpError::new(
                reduct_base::error::ErrorCode::UnprocessableEntity,
                "Invalid body",
            )
        })?;
        let response = match serde_json::from_slice::<ReplicationModePayload>(&*bytes) {
            Ok(x) => Ok(ReplicationModePayloadAxum::from(x)),
            Err(e) => Err(crate::api::http::HttpError::from(e)),
        };
        response
    }
}

impl<S> FromRequest<S> for ReplicationSettingsAxum
where
    Bytes: FromRequest<S>,
    S: Send + Sync,
{
    type Rejection = HttpError;

    async fn from_request(req: Request<Body>, state: &S) -> Result<Self, Self::Rejection> {
        let bytes = Bytes::from_request(req, state).await.map_err(|_| {
            HttpError::new(
                reduct_base::error::ErrorCode::UnprocessableEntity,
                "Invalid body",
            )
        })?;
        let response = match serde_json::from_slice::<ReplicationSettings>(&*bytes) {
            Ok(x) => Ok(ReplicationSettingsAxum::from(x)),
            Err(e) => Err(crate::api::http::HttpError::from(e)),
        };
        response
    }
}

#[derive(IntoResponse, Twin, Default)]
pub(super) struct ReplicationListAxum(ReplicationList);

#[derive(IntoResponse, Twin)]
pub(super) struct ReplicationFullInfoAxum(FullReplicationInfo);

pub(super) fn create_replication_api_routes() -> axum::Router<Arc<StateKeeper>> {
    axum::Router::new()
        .route("/", get(list_replications))
        .route("/{replication_name}", get(get_replication))
        .route("/{replication_name}", post(create_replication))
        .route("/{replication_name}", put(update_replication))
        .route("/{replication_name}/mode", patch(set_mode))
        .route("/{replication_name}", delete(remove_replication))
}

#[cfg(test)]
mod tests {
    use reduct_base::msg::replication_api::{ReplicationMode, ReplicationSettings};
    use reduct_base::Labels;
    use rstest::fixture;

    #[fixture]
    pub(super) fn settings() -> ReplicationSettings {
        ReplicationSettings {
            src_bucket: "bucket-1".to_string(),
            dst_bucket: "bucket-2".to_string(),
            dst_host: "http://localhost".to_string(),
            dst_token: Some("token".to_string()),
            entries: vec![],
            include: Labels::default(),
            exclude: Labels::default(),
            each_n: None,
            each_s: None,
            when: None,
            mode: ReplicationMode::Enabled,
        }
    }

    mod from_request {
        use super::*;
        use crate::api::http::replication::ReplicationModePayloadAxum;
        use axum::body::Body;
        use axum::body::Bytes;
        use axum::extract::FromRequest;
        use axum::http::Request;
        use futures_util::stream;
        use reduct_base::error::ErrorCode::UnprocessableEntity;
        use rstest::rstest;
        use std::io;

        #[rstest]
        #[tokio::test]
        async fn test_replication_mode_payload_ok() {
            let req = Request::builder()
                .body(Body::from(r#"{"mode":"paused"}"#))
                .unwrap();

            let payload = ReplicationModePayloadAxum::from_request(req, &())
                .await
                .expect("parse payload");
            assert_eq!(payload.0.mode, ReplicationMode::Paused);
        }

        #[rstest]
        #[tokio::test]
        async fn test_replication_mode_payload_invalid_json() {
            let req = Request::builder().body(Body::from("{bad json")).unwrap();

            let err = ReplicationModePayloadAxum::from_request(req, &())
                .await
                .expect_err("should fail");
            assert_eq!(err.status(), UnprocessableEntity);
        }

        #[rstest]
        #[tokio::test]
        async fn test_replication_mode_payload_body_error() {
            let stream = stream::once(async {
                Err::<Bytes, _>(io::Error::new(io::ErrorKind::Other, "boom"))
            });
            let req = Request::builder().body(Body::from_stream(stream)).unwrap();

            let err = ReplicationModePayloadAxum::from_request(req, &())
                .await
                .expect_err("should fail");
            assert_eq!(err.status(), UnprocessableEntity);
            assert_eq!(err.message(), "Invalid body");
        }

        #[rstest]
        #[tokio::test]
        async fn test_replication_settings_ok() {
            use crate::api::http::replication::ReplicationSettingsAxum;

            let json = r#"{"src_bucket":"b1","dst_bucket":"b2","dst_host":"http://localhost"}"#;
            let req = Request::builder().body(Body::from(json)).unwrap();

            let payload = ReplicationSettingsAxum::from_request(req, &())
                .await
                .expect("parse settings");
            assert_eq!(payload.0.src_bucket, "b1");
            assert_eq!(payload.0.dst_bucket, "b2");
        }

        #[rstest]
        #[tokio::test]
        async fn test_replication_settings_invalid_json() {
            use crate::api::http::replication::ReplicationSettingsAxum;

            let req = Request::builder().body(Body::from("{bad json")).unwrap();

            let err = ReplicationSettingsAxum::from_request(req, &())
                .await
                .expect_err("should fail");
            assert_eq!(err.status(), UnprocessableEntity);
        }

        #[rstest]
        #[tokio::test]
        async fn test_replication_settings_body_error() {
            use crate::api::http::replication::ReplicationSettingsAxum;

            let stream = stream::once(async {
                Err::<Bytes, _>(io::Error::new(io::ErrorKind::Other, "boom"))
            });
            let req = Request::builder().body(Body::from_stream(stream)).unwrap();

            let err = ReplicationSettingsAxum::from_request(req, &())
                .await
                .expect_err("should fail");
            assert_eq!(err.status(), UnprocessableEntity);
            assert_eq!(err.message(), "Invalid body");
        }
    }
}