use std::path::PathBuf;
use reddb_wire::{parse as wire_parse, ConnectionTarget, ParseErrorKind};
use crate::error::{ClientError, ErrorCode, Result};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Target {
Memory,
File { path: PathBuf },
Grpc { endpoint: String },
GrpcCluster {
primary: String,
replicas: Vec<String>,
force_primary: bool,
},
Http { base_url: String },
}
pub fn parse(uri: &str) -> Result<Target> {
let target = wire_parse(uri).map_err(|e| match e.kind {
ParseErrorKind::Empty => ClientError::new(ErrorCode::InvalidUri, e.message),
ParseErrorKind::InvalidUri => ClientError::new(ErrorCode::InvalidUri, e.message),
ParseErrorKind::UnsupportedScheme => {
let scheme = e
.message
.strip_prefix("unsupported scheme: ")
.unwrap_or(&e.message);
ClientError::unsupported_scheme(scheme)
}
ParseErrorKind::LimitExceeded => {
ClientError::new(ErrorCode::InvalidUri, e.message)
}
})?;
Ok(map_target(target))
}
fn map_target(t: ConnectionTarget) -> Target {
match t {
ConnectionTarget::Memory => Target::Memory,
ConnectionTarget::File { path } => Target::File { path },
ConnectionTarget::Grpc { endpoint } => Target::Grpc { endpoint },
ConnectionTarget::GrpcCluster {
primary,
replicas,
force_primary,
} => Target::GrpcCluster {
primary,
replicas,
force_primary,
},
ConnectionTarget::Http { base_url } => Target::Http { base_url },
ConnectionTarget::RedWire { host, port, tls } => {
let _ = tls;
Target::Grpc {
endpoint: format!("http://{host}:{port}"),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_memory() {
assert_eq!(parse("memory://").unwrap(), Target::Memory);
assert_eq!(parse("memory:").unwrap(), Target::Memory);
}
#[test]
fn parses_file_with_absolute_path() {
let target = parse("file:///var/lib/reddb/data.rdb").unwrap();
match target {
Target::File { path } => assert_eq!(path, PathBuf::from("/var/lib/reddb/data.rdb")),
_ => panic!("expected File"),
}
}
#[test]
fn parses_grpc_with_default_port() {
let target = parse("grpc://primary.svc.cluster.local").unwrap();
match target {
Target::Grpc { endpoint } => {
assert_eq!(endpoint, "http://primary.svc.cluster.local:5055")
}
_ => panic!("expected Grpc"),
}
}
#[test]
fn parses_red_with_default_port() {
let target = parse("red://primary.svc.cluster.local").unwrap();
match target {
Target::Grpc { endpoint } => {
assert_eq!(endpoint, "http://primary.svc.cluster.local:5050")
}
_ => panic!("expected Grpc (back-compat for red://)"),
}
}
#[test]
fn parses_grpc_with_explicit_port() {
let target = parse("grpc://primary:6000").unwrap();
match target {
Target::Grpc { endpoint } => assert_eq!(endpoint, "http://primary:6000"),
_ => panic!("expected Grpc"),
}
}
#[test]
fn rejects_unknown_scheme() {
let err = parse("mongodb://localhost").unwrap_err();
assert_eq!(err.code, ErrorCode::UnsupportedScheme);
}
#[test]
fn rejects_empty() {
assert_eq!(parse("").unwrap_err().code, ErrorCode::InvalidUri);
}
#[test]
fn rejects_file_without_path() {
assert_eq!(parse("file://").unwrap_err().code, ErrorCode::InvalidUri);
}
#[test]
fn parses_grpc_cluster_with_explicit_ports() {
let target = parse("grpc://primary:5055,replica1:5055,replica2:5055").unwrap();
match target {
Target::GrpcCluster {
primary,
replicas,
force_primary,
} => {
assert_eq!(primary, "http://primary:5055");
assert_eq!(
replicas,
vec!["http://replica1:5055", "http://replica2:5055"]
);
assert!(!force_primary);
}
other => panic!("expected GrpcCluster, got {other:?}"),
}
}
#[test]
fn cluster_inherits_default_port_per_scheme() {
match parse("grpc://a,b").unwrap() {
Target::GrpcCluster {
primary, replicas, ..
} => {
assert_eq!(primary, "http://a:5055");
assert_eq!(replicas, vec!["http://b:5055"]);
}
other => panic!("expected GrpcCluster, got {other:?}"),
}
match parse("red://a,b").unwrap() {
Target::GrpcCluster {
primary, replicas, ..
} => {
assert_eq!(primary, "http://a:5050");
assert_eq!(replicas, vec!["http://b:5050"]);
}
other => panic!("expected GrpcCluster, got {other:?}"),
}
}
#[test]
fn cluster_per_host_port_overrides_default() {
match parse("grpc://a:7000,b:7001,c").unwrap() {
Target::GrpcCluster {
primary, replicas, ..
} => {
assert_eq!(primary, "http://a:7000");
assert_eq!(replicas, vec!["http://b:7001", "http://c:5055"]);
}
other => panic!("expected GrpcCluster, got {other:?}"),
}
}
#[test]
fn cluster_route_primary_query_param_forces_primary() {
match parse("grpc://primary,replica?route=primary").unwrap() {
Target::GrpcCluster {
primary,
replicas,
force_primary,
} => {
assert_eq!(primary, "http://primary:5055");
assert_eq!(replicas, vec!["http://replica:5055"]);
assert!(force_primary, "?route=primary must set force_primary");
}
other => panic!("expected GrpcCluster, got {other:?}"),
}
}
#[test]
fn cluster_rejects_empty_host_entry() {
assert_eq!(
parse("grpc://primary,,replica").unwrap_err().code,
ErrorCode::InvalidUri
);
assert_eq!(parse("grpc://,b").unwrap_err().code, ErrorCode::InvalidUri);
}
#[test]
fn cluster_rejects_invalid_port() {
assert_eq!(
parse("grpc://a:nope,b:5055").unwrap_err().code,
ErrorCode::InvalidUri
);
}
#[test]
fn single_host_grpc_still_routes_to_grpc_target_not_cluster() {
match parse("grpc://primary:5055").unwrap() {
Target::Grpc { endpoint } => assert_eq!(endpoint, "http://primary:5055"),
other => panic!("expected Grpc (single host), got {other:?}"),
}
}
}