#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ConsistencyLevel {
All,
#[default]
Quorum,
One,
LocalQuorum,
}
#[derive(Debug, Clone)]
pub struct AmatersConfig {
pub cluster_endpoints: Vec<String>,
pub metadata_keyspace: String,
pub blob_keyspace: String,
pub replication_factor: usize,
pub read_consistency: ConsistencyLevel,
pub write_consistency: ConsistencyLevel,
pub timeout_ms: u64,
pub max_retries: usize,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
pub enable_compression: bool,
pub circuit_breaker_threshold: usize,
pub circuit_breaker_timeout_ms: u64,
}
impl AmatersConfig {
pub const DEFAULT_REPLICATION_FACTOR: usize = 3;
pub fn from_url(url: &str) -> anyhow::Result<Self> {
let rest = url.strip_prefix("amaters://").ok_or_else(|| {
anyhow::anyhow!("AmateRS URL must start with 'amaters://', got: {url}")
})?;
let (rest_no_query, query_string) = match rest.find('?') {
Some(idx) => (&rest[..idx], &rest[idx + 1..]),
None => (rest, ""),
};
let (authority, path_segment) = match rest_no_query.find('/') {
Some(idx) => (&rest_no_query[..idx], &rest_no_query[idx + 1..]),
None => (rest_no_query, ""),
};
if authority.is_empty() {
return Err(anyhow::anyhow!(
"AmateRS URL contains no host endpoints: {url}"
));
}
let cluster_endpoints: Vec<String> = authority
.split(',')
.map(|ep| {
let ep = ep.trim();
if ep.is_empty() {
return Err(anyhow::anyhow!("Empty endpoint in AmateRS URL: {url}"));
}
if !ep.contains(':') {
return Err(anyhow::anyhow!(
"AmateRS endpoint '{ep}' is missing a port (expected host:port)"
));
}
Ok(ep.to_string())
})
.collect::<anyhow::Result<Vec<String>>>()?;
if cluster_endpoints.is_empty() {
return Err(anyhow::anyhow!(
"AmateRS URL contains no valid endpoints: {url}"
));
}
let keyspace = path_segment.trim_matches('/');
let (metadata_keyspace, blob_keyspace) = if keyspace.is_empty() {
("rusmes_metadata".to_string(), "rusmes_blobs".to_string())
} else {
(keyspace.to_string(), format!("{keyspace}_blobs"))
};
let mut cfg = Self {
cluster_endpoints,
metadata_keyspace,
blob_keyspace,
..Self::default()
};
if !query_string.is_empty() {
for pair in query_string.split('&') {
let mut parts = pair.splitn(2, '=');
let key = parts.next().unwrap_or("").trim();
let val = parts.next().unwrap_or("").trim();
match key {
"max_retries" => {
cfg.max_retries = val.parse::<usize>().map_err(|_| {
anyhow::anyhow!(
"AmateRS URL: invalid max_retries value '{val}': expected unsigned integer"
)
})?;
}
"initial_backoff_ms" => {
cfg.initial_backoff_ms = val.parse::<u64>().map_err(|_| {
anyhow::anyhow!(
"AmateRS URL: invalid initial_backoff_ms value '{val}': expected unsigned integer"
)
})?;
}
"max_backoff_ms" => {
cfg.max_backoff_ms = val.parse::<u64>().map_err(|_| {
anyhow::anyhow!(
"AmateRS URL: invalid max_backoff_ms value '{val}': expected unsigned integer"
)
})?;
}
_ => {}
}
}
}
Ok(cfg)
}
}
impl Default for AmatersConfig {
fn default() -> Self {
Self {
cluster_endpoints: vec!["localhost:9042".to_string()],
metadata_keyspace: "rusmes_metadata".to_string(),
blob_keyspace: "rusmes_blobs".to_string(),
replication_factor: Self::DEFAULT_REPLICATION_FACTOR,
read_consistency: ConsistencyLevel::default(),
write_consistency: ConsistencyLevel::default(),
timeout_ms: 10_000,
max_retries: 3,
initial_backoff_ms: 100,
max_backoff_ms: 5_000,
enable_compression: true,
circuit_breaker_threshold: 5,
circuit_breaker_timeout_ms: 60_000,
}
}
}