Skip to main content

reddb_client/
connect.rs

1//! Connection-string parser. Thin shim that delegates to
2//! [`reddb_wire::conn_string`] (the canonical, workspace-shared
3//! parser) and projects its richer [`ConnectionTarget`] vocabulary
4//! onto the legacy [`Target`] enum exposed by this crate.
5//!
6//! Why the shim: the previously-published `reddb-client` driver
7//! had its own copy of the parser that mapped `red://host:port` to
8//! a gRPC endpoint. The shared parser exposes a separate
9//! [`reddb_wire::ConnectionTarget::RedWire`] variant; keeping the
10//! shim preserves the existing public API surface so downstream
11//! `reddb_client::connect::Target` users keep compiling without
12//! changes. Direct callers can opt into the richer vocabulary by
13//! depending on `reddb-wire` and using `reddb_wire::parse` instead.
14
15use std::path::PathBuf;
16
17use reddb_wire::{parse as wire_parse, ConnectionTarget, ParseErrorKind};
18
19use crate::error::{ClientError, ErrorCode, Result};
20
21/// What kind of backend the user asked for.
22///
23/// Note: `red://` and `reds://` URIs are folded onto
24/// [`Target::Grpc`] / [`Target::GrpcCluster`] for backwards
25/// compatibility with the previous driver release. New code that
26/// wants the RedWire variant explicitly should depend on
27/// `reddb-wire` directly.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum Target {
30    /// `memory://` — ephemeral, in-memory backend.
31    Memory,
32    /// `file:///abs/path` — embedded engine on disk.
33    File { path: PathBuf },
34    /// `grpc://host:port` — single-host remote tonic client.
35    /// Also produced by `red://host:port` and `reds://host:port`
36    /// for back-compat with the previous driver behaviour.
37    Grpc { endpoint: String },
38    /// `grpc://primary:port,replica1:port,replica2:port` — primary +
39    /// read-replica fleet. Writes always go to `primary`; reads
40    /// round-robin across `replicas` (or to `primary` when the
41    /// replica set is empty / a `?route=primary` query param is set).
42    GrpcCluster {
43        primary: String,
44        replicas: Vec<String>,
45        force_primary: bool,
46    },
47    /// `http://host:port` / `https://host:port` — REST client.
48    Http { base_url: String },
49}
50
51/// Parse a connection URI. Pure function, no side effects.
52pub fn parse(uri: &str) -> Result<Target> {
53    let target = wire_parse(uri).map_err(|e| match e.kind {
54        ParseErrorKind::Empty => ClientError::new(ErrorCode::InvalidUri, e.message),
55        ParseErrorKind::InvalidUri => ClientError::new(ErrorCode::InvalidUri, e.message),
56        ParseErrorKind::UnsupportedScheme => {
57            // `e.message` is `"unsupported scheme: <scheme>"`; fall
58            // back to the helper for the canonical wording.
59            let scheme = e
60                .message
61                .strip_prefix("unsupported scheme: ")
62                .unwrap_or(&e.message);
63            ClientError::unsupported_scheme(scheme)
64        }
65        ParseErrorKind::LimitExceeded => {
66            // DoS guardrails added in #90 (max URI bytes, max query
67            // params, max cluster hosts). Surface as InvalidUri with
68            // the structured message intact.
69            ClientError::new(ErrorCode::InvalidUri, e.message)
70        }
71    })?;
72    Ok(map_target(target))
73}
74
75fn map_target(t: ConnectionTarget) -> Target {
76    match t {
77        ConnectionTarget::Memory => Target::Memory,
78        ConnectionTarget::File { path } => Target::File { path },
79        ConnectionTarget::Grpc { endpoint } => Target::Grpc { endpoint },
80        ConnectionTarget::GrpcCluster {
81            primary,
82            replicas,
83            force_primary,
84        } => Target::GrpcCluster {
85            primary,
86            replicas,
87            force_primary,
88        },
89        ConnectionTarget::Http { base_url } => Target::Http { base_url },
90        // Back-compat: previous driver routed `red://` / `reds://`
91        // through the gRPC endpoint, not a separate RedWire path.
92        // Preserve that mapping until downstream code migrates.
93        ConnectionTarget::RedWire { host, port, tls } => {
94            let _ = tls;
95            Target::Grpc {
96                endpoint: format!("http://{host}:{port}"),
97            }
98        }
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[test]
107    fn parses_memory() {
108        assert_eq!(parse("memory://").unwrap(), Target::Memory);
109        assert_eq!(parse("memory:").unwrap(), Target::Memory);
110    }
111
112    #[test]
113    fn parses_file_with_absolute_path() {
114        let target = parse("file:///var/lib/reddb/data.rdb").unwrap();
115        match target {
116            Target::File { path } => assert_eq!(path, PathBuf::from("/var/lib/reddb/data.rdb")),
117            _ => panic!("expected File"),
118        }
119    }
120
121    #[test]
122    fn parses_grpc_with_default_port() {
123        let target = parse("grpc://primary.svc.cluster.local").unwrap();
124        match target {
125            Target::Grpc { endpoint } => {
126                assert_eq!(endpoint, "http://primary.svc.cluster.local:5055")
127            }
128            _ => panic!("expected Grpc"),
129        }
130    }
131
132    #[test]
133    fn parses_red_with_default_port() {
134        let target = parse("red://primary.svc.cluster.local").unwrap();
135        match target {
136            Target::Grpc { endpoint } => {
137                assert_eq!(endpoint, "http://primary.svc.cluster.local:5050")
138            }
139            _ => panic!("expected Grpc (back-compat for red://)"),
140        }
141    }
142
143    #[test]
144    fn parses_grpc_with_explicit_port() {
145        let target = parse("grpc://primary:6000").unwrap();
146        match target {
147            Target::Grpc { endpoint } => assert_eq!(endpoint, "http://primary:6000"),
148            _ => panic!("expected Grpc"),
149        }
150    }
151
152    #[test]
153    fn rejects_unknown_scheme() {
154        let err = parse("mongodb://localhost").unwrap_err();
155        assert_eq!(err.code, ErrorCode::UnsupportedScheme);
156    }
157
158    #[test]
159    fn rejects_empty() {
160        assert_eq!(parse("").unwrap_err().code, ErrorCode::InvalidUri);
161    }
162
163    #[test]
164    fn rejects_file_without_path() {
165        assert_eq!(parse("file://").unwrap_err().code, ErrorCode::InvalidUri);
166    }
167
168    #[test]
169    fn parses_grpc_cluster_with_explicit_ports() {
170        let target = parse("grpc://primary:5055,replica1:5055,replica2:5055").unwrap();
171        match target {
172            Target::GrpcCluster {
173                primary,
174                replicas,
175                force_primary,
176            } => {
177                assert_eq!(primary, "http://primary:5055");
178                assert_eq!(
179                    replicas,
180                    vec!["http://replica1:5055", "http://replica2:5055"]
181                );
182                assert!(!force_primary);
183            }
184            other => panic!("expected GrpcCluster, got {other:?}"),
185        }
186    }
187
188    #[test]
189    fn cluster_inherits_default_port_per_scheme() {
190        match parse("grpc://a,b").unwrap() {
191            Target::GrpcCluster {
192                primary, replicas, ..
193            } => {
194                assert_eq!(primary, "http://a:5055");
195                assert_eq!(replicas, vec!["http://b:5055"]);
196            }
197            other => panic!("expected GrpcCluster, got {other:?}"),
198        }
199        match parse("red://a,b").unwrap() {
200            Target::GrpcCluster {
201                primary, replicas, ..
202            } => {
203                assert_eq!(primary, "http://a:5050");
204                assert_eq!(replicas, vec!["http://b:5050"]);
205            }
206            other => panic!("expected GrpcCluster, got {other:?}"),
207        }
208    }
209
210    #[test]
211    fn cluster_per_host_port_overrides_default() {
212        match parse("grpc://a:7000,b:7001,c").unwrap() {
213            Target::GrpcCluster {
214                primary, replicas, ..
215            } => {
216                assert_eq!(primary, "http://a:7000");
217                assert_eq!(replicas, vec!["http://b:7001", "http://c:5055"]);
218            }
219            other => panic!("expected GrpcCluster, got {other:?}"),
220        }
221    }
222
223    #[test]
224    fn cluster_route_primary_query_param_forces_primary() {
225        match parse("grpc://primary,replica?route=primary").unwrap() {
226            Target::GrpcCluster {
227                primary,
228                replicas,
229                force_primary,
230            } => {
231                assert_eq!(primary, "http://primary:5055");
232                assert_eq!(replicas, vec!["http://replica:5055"]);
233                assert!(force_primary, "?route=primary must set force_primary");
234            }
235            other => panic!("expected GrpcCluster, got {other:?}"),
236        }
237    }
238
239    #[test]
240    fn cluster_rejects_empty_host_entry() {
241        assert_eq!(
242            parse("grpc://primary,,replica").unwrap_err().code,
243            ErrorCode::InvalidUri
244        );
245        assert_eq!(parse("grpc://,b").unwrap_err().code, ErrorCode::InvalidUri);
246    }
247
248    #[test]
249    fn cluster_rejects_invalid_port() {
250        assert_eq!(
251            parse("grpc://a:nope,b:5055").unwrap_err().code,
252            ErrorCode::InvalidUri
253        );
254    }
255
256    #[test]
257    fn single_host_grpc_still_routes_to_grpc_target_not_cluster() {
258        match parse("grpc://primary:5055").unwrap() {
259            Target::Grpc { endpoint } => assert_eq!(endpoint, "http://primary:5055"),
260            other => panic!("expected Grpc (single host), got {other:?}"),
261        }
262    }
263}