use crate::bucket::BucketBuilder;
use crate::http_client::HttpClient;
use crate::replication::ReplicationBuilder;
use crate::Bucket;
use reduct_base::error::{ErrorCode, ReductError};
use reduct_base::msg::replication_api::{
FullReplicationInfo, ReplicationInfo, ReplicationList, ReplicationMode, ReplicationModePayload,
ReplicationSettings,
};
use reduct_base::msg::server_api::{BucketInfoList, ServerInfo};
use reqwest::{Certificate, Method, Url};
use std::sync::Arc;
use std::time::Duration;
mod token;
pub use token::CreateTokenBuilder;
pub struct ReductClientBuilder {
url: String,
api_token: String,
timeout: Duration,
http1_only: bool,
verify_ssl: bool,
ca_cert_path: Option<String>,
}
pub type Result<T> = std::result::Result<T, ReductError>;
pub(super) static API_BASE: &str = "api/v1";
impl ReductClientBuilder {
fn new() -> Self {
Self {
url: String::new(),
api_token: String::new(),
timeout: Duration::from_secs(30),
http1_only: false,
verify_ssl: true,
ca_cert_path: None,
}
}
pub fn build(self) -> ReductClient {
self.try_build().unwrap()
}
pub fn try_build(self) -> Result<ReductClient> {
if self.url.is_empty() {
return Err(ReductError::new(
ErrorCode::UrlParseError,
"URL must be set",
));
}
ReductError::new(ErrorCode::UrlParseError, "URL must be set");
let builder = reqwest::ClientBuilder::new()
.timeout(self.timeout)
.cookie_store(true)
.danger_accept_invalid_certs(!self.verify_ssl);
let builder = if let Some(ca_cert_path) = self.ca_cert_path {
let certs = std::fs::read(&ca_cert_path)
.map_err(|e| {
ReductError::new(
ErrorCode::Unknown,
&format!("Failed to read CA certificate '{}': {}", ca_cert_path, e),
)
})
.and_then(|pem| {
Certificate::from_pem_bundle(&pem).map_err(|e| {
ReductError::new(
ErrorCode::Unknown,
&format!("Failed to parse CA certificate '{}': {}", ca_cert_path, e),
)
})
})?;
certs
.into_iter()
.fold(builder, |builder, cert| builder.add_root_certificate(cert))
} else {
builder
};
let builder = if self.http1_only {
builder.http1_only()
} else {
builder
};
Ok(ReductClient {
http_client: Arc::new(HttpClient::new(
&self.url,
&self.api_token,
builder.build().map_err(|e| {
ReductError::new(
ErrorCode::Unknown,
&format!("Failed to create HTTP client: {}", e),
)
})?,
)?),
})
}
pub fn url(mut self, url: &str) -> Self {
let url = if !url.ends_with('/') {
format!("{}/", url)
} else {
url.to_string()
};
let url = Url::parse(&url).expect("Invalid URL");
self.url = url
.join(API_BASE)
.expect("Failed to join URL with API base path")
.to_string();
self
}
pub fn api_token(mut self, api_token: &str) -> Self {
self.api_token = api_token.to_string();
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn http1_only(mut self) -> Self {
self.http1_only = true;
self
}
pub fn verify_ssl(mut self, verify_ssl: bool) -> Self {
self.verify_ssl = verify_ssl;
self
}
pub fn ca_cert_path(mut self, ca_cert_path: &str) -> Self {
self.ca_cert_path = Some(ca_cert_path.to_string());
self
}
}
pub struct ReductClient {
http_client: Arc<HttpClient>,
}
impl ReductClient {
pub fn builder() -> ReductClientBuilder {
ReductClientBuilder::new()
}
pub fn url(&self) -> &str {
self.http_client.url()
}
pub fn api_token(&self) -> &str {
self.http_client.api_token()
}
pub async fn server_info(&self) -> Result<ServerInfo> {
self.http_client
.send_and_receive_json::<(), ServerInfo>(Method::GET, "/info", None)
.await
}
pub async fn bucket_list(&self) -> Result<BucketInfoList> {
self.http_client
.send_and_receive_json::<(), BucketInfoList>(Method::GET, "/list", None)
.await
}
pub fn create_bucket(&self, name: &str) -> BucketBuilder {
BucketBuilder::new(name.to_string(), Arc::clone(&self.http_client))
}
pub async fn get_bucket(&self, name: &str) -> Result<Bucket> {
let request = self
.http_client
.request(Method::HEAD, &format!("/b/{}", name));
self.http_client.send_request(request).await?;
Ok(Bucket {
name: name.to_string(),
http_client: self.http_client.clone(),
})
}
pub async fn alive(&self) -> Result<()> {
let request = self.http_client.request(Method::HEAD, "/alive");
self.http_client.send_request(request).await?;
Ok(())
}
pub async fn list_replications(&self) -> Result<Vec<ReplicationInfo>> {
let list = self
.http_client
.send_and_receive_json::<(), ReplicationList>(Method::GET, "/replications", None)
.await?;
Ok(list.replications)
}
pub async fn get_replication(&self, name: &str) -> Result<FullReplicationInfo> {
let info = self
.http_client
.send_and_receive_json::<(), FullReplicationInfo>(
Method::GET,
&format!("/replications/{}", name),
None,
)
.await?;
Ok(info)
}
pub fn create_replication(&self, name: &str) -> ReplicationBuilder {
ReplicationBuilder::new(name.to_string(), Arc::clone(&self.http_client))
}
pub async fn update_replication(
&self,
name: &str,
settings: ReplicationSettings,
) -> Result<()> {
self.http_client
.send_json(Method::PUT, &format!("/replications/{}", name), settings)
.await
}
pub async fn set_replication_mode(&self, name: &str, mode: ReplicationMode) -> Result<()> {
self.http_client
.send_json(
Method::PATCH,
&format!("/replications/{}/mode", name),
ReplicationModePayload { mode },
)
.await
}
pub async fn delete_replication(&self, name: &str) -> Result<()> {
let request = self
.http_client
.request(Method::DELETE, &format!("/replications/{}", name));
self.http_client.send_request(request).await?;
Ok(())
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::record::Labels;
use bytes::Bytes;
use reduct_base::msg::bucket_api::{BucketSettings, QuotaType};
use rstest::{fixture, rstest};
use tokio::time::sleep;
mod build {
use super::*;
use std::fs;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
struct TempCaCert(String);
impl TempCaCert {
fn path(&self) -> &str {
&self.0
}
}
impl Drop for TempCaCert {
fn drop(&mut self) {
let _ = fs::remove_file(&self.0);
}
}
#[fixture]
fn missing_ca_cert_path() -> &'static str {
"/tmp/missing-ca-cert.pem"
}
#[rstest]
#[case("http://domain.com:8333", "http://domain.com:8333/")]
#[case("http://domain.com:8333/", "http://domain.com:8333/")]
#[case("http://domain.com:8333/prefix", "http://domain.com:8333/prefix/")]
#[case("http://domain.com:8333/prefix/", "http://domain.com:8333/prefix/")]
fn test_build_client(#[case] url: &str, #[case] expected_url: &str) {
let client = ReductClient::builder().url(url).build();
assert_eq!(client.url(), expected_url);
}
#[rstest]
fn test_build_client_with_missing_ca_cert(missing_ca_cert_path: &str) {
let err = ReductClient::builder()
.url("https://domain.com:8333")
.ca_cert_path(missing_ca_cert_path)
.try_build()
.err()
.unwrap();
assert_eq!(err.status(), ErrorCode::Unknown);
assert!(err.to_string().contains(&format!(
"Failed to read CA certificate '{}'",
missing_ca_cert_path
)));
}
#[fixture]
fn ca_cert_pem() -> &'static str {
"-----BEGIN CERTIFICATE-----
MIIBtjCCAVugAwIBAgITBmyf1XSXNmY/Owua2eiedgPySjAKBggqhkjOPQQDAjA5
MQswCQYDVQQGEwJVUzEPMA0GA1UEChMGQW1hem9uMRkwFwYDVQQDExBBbWF6b24g
Um9vdCBDQSAzMB4XDTE1MDUyNjAwMDAwMFoXDTQwMDUyNjAwMDAwMFowOTELMAkG
A1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJvb3Qg
Q0EgMzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABCmXp8ZBf8ANm+gBG1bG8lKl
ui2yEujSLtf6ycXYqm0fc4E7O5hrOXwzpcVOho6AF2hiRVd9RFgdszflZwjrZt6j
QjBAMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgGGMB0GA1UdDgQWBBSr
ttvXBp43rDCGB5Fwx5zEGbF4wDAKBggqhkjOPQQDAgNJADBGAiEA4IWSoxe3jfkr
BqWTrBqYaGFy+uGh0PsceGCmQ5nFuMQCIQCcAu/xlJyzlvnrxir4tiz+OpAUFteM
YyRIHN8wfdVoOw==
-----END CERTIFICATE-----
"
}
#[fixture]
fn ca_cert(ca_cert_pem: &str) -> TempCaCert {
let cert_path = std::env::temp_dir().join(format!(
"reduct-rs-test-ca-{}.pem",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
));
fs::write(&cert_path, ca_cert_pem).unwrap();
TempCaCert(cert_path_to_str(&cert_path).to_string())
}
#[rstest]
fn test_build_client_with_ca_cert(ca_cert: TempCaCert) {
let client = ReductClient::builder()
.url("https://domain.com:8333")
.ca_cert_path(ca_cert.path())
.try_build();
assert!(client.is_ok());
}
fn cert_path_to_str(path: &Path) -> &str {
path.to_str()
.expect("temp certificate path must be valid UTF-8")
}
}
mod serve_api {
use super::*;
#[rstest]
#[tokio::test]
async fn test_server_info(#[future] client: ReductClient) {
let info = client.await.server_info().await.unwrap();
assert!(info.version.starts_with("1."));
assert!(info.bucket_count >= 2);
}
#[test_with::file(/ misc / lic.key)]
#[rstest]
#[tokio::test]
async fn test_server_license(#[future] client: ReductClient) {
let info = client.await.server_info().await.unwrap();
assert_eq!(info.license.unwrap().licensee, "ReductStore");
}
#[rstest]
#[tokio::test]
async fn test_bucket_list(#[future] client: ReductClient) {
let info = client.await.bucket_list().await.unwrap();
assert!(info.buckets.len() >= 2);
}
#[rstest]
#[tokio::test]
async fn test_alive(#[future] client: ReductClient) {
client.await.alive().await.unwrap();
}
}
mod bucket_api {
use super::*;
#[rstest]
#[tokio::test]
async fn test_create_bucket(#[future] client: ReductClient) {
let client = client.await;
let bucket = client.create_bucket("test-bucket").send().await.unwrap();
assert_eq!(bucket.name(), "test-bucket");
}
#[rstest]
#[tokio::test]
async fn test_get_or_create_bucket(#[future] client: ReductClient) {
let client = client.await;
let bucket = client
.create_bucket("test-bucket")
.exist_ok(true)
.send()
.await
.unwrap();
assert_eq!(bucket.name(), "test-bucket");
}
#[rstest]
#[tokio::test]
async fn test_get_bucket(#[future] client: ReductClient) {
let client = client.await;
let bucket = client.get_bucket("test-bucket-1").await.unwrap();
assert_eq!(bucket.name(), "test-bucket-1");
}
}
mod replication_api {
use super::*;
use crate::condition;
use reduct_base::msg::diagnostics::Diagnostics;
use reduct_base::msg::replication_api::{ReplicationMode, ReplicationSettings};
#[rstest]
#[tokio::test]
async fn test_list_replications(#[future] client: ReductClient) {
let replications = client.await.list_replications().await.unwrap();
assert!(replications.is_empty());
}
#[rstest]
#[tokio::test]
async fn test_create_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.src_bucket(settings.src_bucket.as_str())
.dst_bucket(settings.dst_bucket.as_str())
.dst_host(settings.dst_host.as_str())
.dst_token(settings.dst_token.unwrap_or_default().as_str())
.entries(settings.entries.clone())
.when(settings.when.unwrap())
.send()
.await
.unwrap();
let replications = client.list_replications().await.unwrap();
assert_eq!(replications.len(), 1);
}
#[rstest]
#[tokio::test]
async fn test_get_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.set_settings(settings.clone())
.send()
.await
.unwrap();
let replication = client.get_replication("test-replication").await.unwrap();
assert_eq!(
replication.info,
ReplicationInfo {
name: "test-replication".to_string(),
mode: ReplicationMode::Enabled,
is_active: true,
is_provisioned: false,
pending_records: 0,
}
);
assert_eq!(
replication.settings,
ReplicationSettings {
dst_token: None,
..settings
}
);
assert_eq!(replication.diagnostics, Diagnostics::default());
}
#[rstest]
#[tokio::test]
async fn test_update_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.set_settings(settings.clone())
.send()
.await
.unwrap();
let replication = client.get_replication("test-replication").await.unwrap();
assert_eq!(
replication.settings,
ReplicationSettings {
dst_token: None,
..settings
}
);
}
#[rstest]
#[tokio::test]
async fn test_set_replication_mode(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.set_settings(settings.clone())
.send()
.await
.unwrap();
client
.set_replication_mode("test-replication", ReplicationMode::Paused)
.await
.unwrap();
let replication = client.get_replication("test-replication").await.unwrap();
assert_eq!(replication.info.mode, ReplicationMode::Paused);
assert_eq!(replication.settings.mode, ReplicationMode::Paused);
}
#[rstest]
#[tokio::test]
async fn test_delete_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.set_settings(settings.clone())
.send()
.await
.unwrap();
client.delete_replication("test-replication").await.unwrap();
let replications = client.list_replications().await.unwrap();
assert!(replications.is_empty());
}
#[fixture]
fn settings() -> ReplicationSettings {
ReplicationSettings {
src_bucket: "test-bucket-1".to_string(),
dst_bucket: "test-bucket-2".to_string(),
dst_host: "http://127.0.0.1:8383".to_string(),
dst_token: std::env::var("RS_API_TOKEN").ok(),
entries: vec![],
include: Labels::default(),
exclude: Labels::default(),
each_s: Some(1.0),
each_n: Some(1),
when: Some(condition!({"$eq": ["&label", 1]})),
mode: ReplicationMode::Enabled,
}
}
}
#[fixture]
pub(crate) fn bucket_settings() -> BucketSettings {
BucketSettings {
quota_type: Some(QuotaType::FIFO),
quota_size: Some(10_000_000_000),
max_block_size: Some(512),
max_block_records: Some(100),
}
}
#[fixture]
pub(crate) async fn client(bucket_settings: BucketSettings) -> ReductClient {
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token(&std::env::var("RS_API_TOKEN").unwrap_or("".to_string()))
.build();
for token in client.list_tokens().await.unwrap() {
if token.name.starts_with("test-token") {
client.delete_token(&token.name).await.unwrap();
}
}
for bucket in client.bucket_list().await.unwrap().buckets {
if bucket.name.starts_with("test-bucket") || bucket.name == "new-bucket" {
let bucket = client.get_bucket(&bucket.name).await.unwrap();
bucket.remove().await.unwrap();
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
for replication in client.list_replications().await.unwrap() {
if replication.name.starts_with("test-replication") {
client.delete_replication(&replication.name).await.unwrap();
}
}
sleep(std::time::Duration::from_millis(100)).await;
let bucket = client
.create_bucket("test-bucket-1")
.settings(bucket_settings.clone())
.send()
.await
.unwrap();
bucket
.write_record("entry-1")
.timestamp_us(1000)
.content_type("text/plain")
.labels(Labels::from([
("entry".into(), "1".into()),
("bucket".into(), "1".into()),
]))
.data(Bytes::from("Hey entry-1!"))
.send()
.await
.unwrap();
bucket
.write_record("entry-2")
.timestamp_us(2000)
.content_type("text/plain")
.labels(Labels::from([
("entry".into(), "2".into()),
("bucket".into(), "1".into()),
]))
.data(Bytes::from("Hey entry-2!"))
.send()
.await
.unwrap();
bucket
.write_record("entry-2")
.timestamp_us(3000)
.data("0")
.send()
.await
.unwrap();
bucket
.write_record("entry-2")
.timestamp_us(4000)
.data("0")
.send()
.await
.unwrap();
let bucket = client
.create_bucket("test-bucket-2")
.settings(bucket_settings)
.send()
.await
.unwrap();
bucket
.write_record("entry-1")
.timestamp_us(1000)
.labels(Labels::from([
("entry".into(), "1".into()),
("bucket".into(), "2".into()),
]))
.data(Bytes::from("Hey entry-1!"))
.send()
.await
.unwrap();
bucket
.write_record("entry-2")
.timestamp_us(2000)
.labels(Labels::from([
("entry".into(), "2".into()),
("bucket".into(), "2".into()),
]))
.data(Bytes::from("Hey entry-2!"))
.send()
.await
.unwrap();
client
}
}