Skip to main content

reddb_wire/
conn_string.rs

1//! Connection-string parser shared across `reddb`, `reddb-client`,
2//! `red_client`, and every language driver.
3//!
4//! Pure function over a string; no I/O, no allocation beyond what the
5//! returned [`ConnectionTarget`] needs. The grammar is defined by
6//! `docs/clients/connection-strings.md`; this module is the canonical
7//! parser and is the single source of truth consumed by the rest of
8//! the workspace.
9//!
10//! The parser ports the logic that previously lived in
11//! `drivers/rust/src/connect.rs` (which keeps a thin re-export layer
12//! for backwards compatibility while drivers migrate over). Cluster
13//! URIs (`grpc://primary,replica:port`), default ports per scheme,
14//! and the `?route=primary` override behave identically to the
15//! original.
16
17use std::path::PathBuf;
18
19use url::Url;
20
21/// Stable error code for parser failures.
22///
23/// Mirrors the `ErrorCode` shape used by the language drivers so that
24/// downstream wrappers can map 1:1 without information loss.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum ParseErrorKind {
27    /// The input was empty.
28    Empty,
29    /// `url::Url` rejected the string, or a transport-specific
30    /// invariant (missing host, empty cluster entry, bad port…) was
31    /// violated.
32    InvalidUri,
33    /// The scheme is not in the documented vocabulary.
34    UnsupportedScheme,
35    /// A DoS guardrail in [`ConnStringLimits`] was tripped.
36    /// `message` carries the limit name + the offending value so
37    /// downstream wrappers can surface the structured detail.
38    LimitExceeded,
39}
40
41impl ParseErrorKind {
42    pub fn as_str(self) -> &'static str {
43        match self {
44            ParseErrorKind::Empty => "EMPTY",
45            ParseErrorKind::InvalidUri => "INVALID_URI",
46            ParseErrorKind::UnsupportedScheme => "UNSUPPORTED_SCHEME",
47            ParseErrorKind::LimitExceeded => "LIMIT_EXCEEDED",
48        }
49    }
50}
51
52/// Error returned by [`parse`].
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct ParseError {
55    pub kind: ParseErrorKind,
56    pub message: String,
57}
58
59impl ParseError {
60    pub fn new(kind: ParseErrorKind, message: impl Into<String>) -> Self {
61        Self {
62            kind,
63            message: message.into(),
64        }
65    }
66}
67
68impl std::fmt::Display for ParseError {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(f, "{}: {}", self.kind.as_str(), self.message)
71    }
72}
73
74impl std::error::Error for ParseError {}
75
76/// Default port per documented scheme. Centralised so other crates
77/// (the connector, server-side dispatch) can stay consistent.
78pub const DEFAULT_PORT_RED: u16 = 5050;
79pub const DEFAULT_PORT_GRPC: u16 = 55055;
80pub const DEFAULT_PORT_GRPCS: u16 = 55555;
81/// Default ports for `red+ws://` and `red+wss://` — align with the
82/// standard WS / WSS browser defaults (80 and 443) so a hosted endpoint
83/// like `*.db.reddb.io` works without an explicit port.
84pub const DEFAULT_PORT_WS: u16 = 80;
85pub const DEFAULT_PORT_WSS: u16 = 443;
86
87/// DoS guardrails applied by [`parse`] before any URI work happens.
88///
89/// The connection-string parser is the only entry point an attacker
90/// can reach BEFORE auth, so every limit here is enforced eagerly
91/// and surfaces as a structured [`ParseErrorKind::LimitExceeded`]
92/// error rather than a panic, hang, or unbounded allocation.
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub struct ConnStringLimits {
95    /// Maximum length of the input URI in bytes. Default `8 KiB`.
96    pub max_uri_bytes: usize,
97    /// Maximum number of `key=value` query parameters. Default `32`.
98    pub max_query_params: usize,
99    /// Maximum number of comma-separated cluster hosts allowed in a
100    /// `red://`/`reds://`/`grpc://` cluster URI. Default `64`.
101    pub max_cluster_hosts: usize,
102}
103
104impl Default for ConnStringLimits {
105    fn default() -> Self {
106        Self {
107            max_uri_bytes: 8 * 1024,
108            max_query_params: 32,
109            max_cluster_hosts: 64,
110        }
111    }
112}
113
114/// Normalised target produced by [`parse`].
115///
116/// Variants intentionally mirror the public Rust client target shape
117/// so callers can keep a thin compatibility layer without duplicating
118/// parser behavior.
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum ConnectionTarget {
121    /// `memory://` — ephemeral, in-memory backend.
122    Memory,
123    /// `file:///abs/path` — embedded engine on disk.
124    File { path: PathBuf },
125    /// Single remote endpoint over `grpc://` or `grpcs://`. Stored
126    /// as a normalised `http://host:port` string because tonic's
127    /// `Endpoint` consumes that form.
128    Grpc { endpoint: String },
129    /// Multi-host gRPC URI: primary + read replicas. Writes hit the
130    /// primary; reads round-robin across replicas unless
131    /// `force_primary` is set.
132    GrpcCluster {
133        primary: String,
134        replicas: Vec<String>,
135        force_primary: bool,
136    },
137    /// `http://host:port` / `https://host:port` — REST endpoint.
138    Http { base_url: String },
139    /// `red://host:port` (plain TCP) or `reds://host:port` (TLS).
140    /// RedWire binary frame protocol per ADR 0001. The connector
141    /// speaks framed binary directly; it does NOT route through
142    /// tonic.
143    RedWire { host: String, port: u16, tls: bool },
144    /// `red+ws://host:port` (plain WS) or `red+wss://host:port` (WSS).
145    /// Browser-native WebSocket transport (ADR 0047 direct-when-reachable).
146    /// The UI connects directly — no local RedWire-over-TCP bridge needed.
147    WsNative { host: String, port: u16, tls: bool },
148}
149
150/// Parse a connection URI into a [`ConnectionTarget`] under the
151/// default DoS limits.
152///
153/// Pure function, no side effects. Behaviour matches
154/// `drivers/rust/src/connect.rs::parse` 1:1 with two additions:
155///   - Mixed-case schemes (e.g. `Red://`, `REDS://`) are normalised
156///     to lowercase before dispatch.
157///   - Inputs exceeding [`ConnStringLimits`] return a structured
158///     [`ParseErrorKind::LimitExceeded`] error instead of being
159///     processed.
160pub fn parse(uri: &str) -> Result<ConnectionTarget, ParseError> {
161    parse_with_limits(uri, ConnStringLimits::default())
162}
163
164/// Return true for documented embedded aliases that must not resolve to
165/// a remote transport target.
166///
167/// This is intentionally separate from [`parse`]: legacy clients may need
168/// to reject embedded targets before mapping `red://host` onto a remote
169/// compatibility transport.
170pub fn is_embedded_connection_uri(uri: &str) -> bool {
171    let trimmed = uri.trim();
172    matches!(
173        trimmed,
174        "red://" | "red:" | "red:///" | "red://:memory" | "red://:memory:"
175    ) || trimmed.starts_with("red:///")
176}
177
178/// Same as [`parse`] but with caller-supplied DoS guardrails.
179/// Useful for tests that need tighter limits or for callers (a
180/// future admin tool, an offline validator) that need to relax the
181/// defaults.
182pub fn parse_with_limits(
183    uri: &str,
184    limits: ConnStringLimits,
185) -> Result<ConnectionTarget, ParseError> {
186    if uri.is_empty() {
187        return Err(ParseError::new(
188            ParseErrorKind::Empty,
189            "empty connection string",
190        ));
191    }
192
193    if uri.len() > limits.max_uri_bytes {
194        return Err(ParseError::new(
195            ParseErrorKind::LimitExceeded,
196            format!(
197                "max_uri_bytes exceeded: limit={} actual={}",
198                limits.max_uri_bytes,
199                uri.len(),
200            ),
201        ));
202    }
203
204    // Lowercase the scheme so `Red://Host`, `REDS://Host`, etc.
205    // dispatch identically to the canonical lowercase forms. The
206    // host and path retain original casing — host is downcased by
207    // `url::Url` for IDN per spec, path stays verbatim.
208    let normalised = normalise_scheme(uri);
209    let uri = normalised.as_str();
210
211    if uri == "memory://" || uri == "memory:" {
212        return Ok(ConnectionTarget::Memory);
213    }
214
215    if let Some(rest) = uri.strip_prefix("file://") {
216        if rest.is_empty() {
217            return Err(ParseError::new(
218                ParseErrorKind::InvalidUri,
219                "file:// URI is missing a path",
220            ));
221        }
222        return Ok(ConnectionTarget::File {
223            path: PathBuf::from(rest),
224        });
225    }
226
227    if let Some(cluster) = try_parse_grpc_cluster(uri, &limits)? {
228        return Ok(cluster);
229    }
230
231    let parsed = Url::parse(uri)
232        .map_err(|e| ParseError::new(ParseErrorKind::InvalidUri, format!("{e}: {uri}")))?;
233
234    enforce_query_param_limit(&parsed, &limits)?;
235
236    match parsed.scheme() {
237        "red" | "reds" => {
238            let host = parsed.host_str().ok_or_else(|| {
239                ParseError::new(ParseErrorKind::InvalidUri, "red:// URI is missing a host")
240            })?;
241            let port = parsed.port().unwrap_or(DEFAULT_PORT_RED);
242            Ok(ConnectionTarget::RedWire {
243                host: host.to_string(),
244                port,
245                tls: parsed.scheme() == "reds",
246            })
247        }
248        "red+ws" | "red+wss" => {
249            let host = parsed.host_str().ok_or_else(|| {
250                ParseError::new(
251                    ParseErrorKind::InvalidUri,
252                    "red+ws(s):// URI is missing a host",
253                )
254            })?;
255            let tls = parsed.scheme() == "red+wss";
256            let port = parsed.port().unwrap_or(if tls {
257                DEFAULT_PORT_WSS
258            } else {
259                DEFAULT_PORT_WS
260            });
261            Ok(ConnectionTarget::WsNative {
262                host: host.to_string(),
263                port,
264                tls,
265            })
266        }
267        "grpc" | "grpcs" => {
268            let host = parsed.host_str().ok_or_else(|| {
269                ParseError::new(ParseErrorKind::InvalidUri, "grpc:// URI is missing a host")
270            })?;
271            let port = parsed.port().unwrap_or_else(|| {
272                if parsed.scheme() == "grpcs" {
273                    DEFAULT_PORT_GRPCS
274                } else {
275                    DEFAULT_PORT_GRPC
276                }
277            });
278            Ok(ConnectionTarget::Grpc {
279                endpoint: format!("http://{host}:{port}"),
280            })
281        }
282        "http" | "https" => {
283            let host = parsed.host_str().ok_or_else(|| {
284                ParseError::new(
285                    ParseErrorKind::InvalidUri,
286                    "http(s):// URI is missing a host",
287                )
288            })?;
289            let scheme = parsed.scheme();
290            let port = parsed
291                .port()
292                .unwrap_or(if scheme == "https" { 443 } else { 80 });
293            Ok(ConnectionTarget::Http {
294                base_url: format!("{scheme}://{host}:{port}"),
295            })
296        }
297        other => Err(ParseError::new(
298            ParseErrorKind::UnsupportedScheme,
299            format!("unsupported scheme: {other}"),
300        )),
301    }
302}
303
304/// Lowercase only the scheme portion (everything before the first
305/// `:`), leaving host/path/query untouched. Returns the original
306/// string when no scheme separator is present so the downstream
307/// `Url::parse` path produces the canonical "missing scheme" error
308/// instead of being masked here.
309fn normalise_scheme(uri: &str) -> String {
310    match uri.find(':') {
311        Some(i) => {
312            let scheme = &uri[..i];
313            // Only ASCII alphanumerics + `+ . -` are valid scheme
314            // bytes per RFC 3986. If the prefix violates that we
315            // leave it alone so `Url::parse` can produce the
316            // structured error.
317            if scheme.is_empty()
318                || !scheme
319                    .bytes()
320                    .all(|b| b.is_ascii_alphanumeric() || b == b'+' || b == b'.' || b == b'-')
321            {
322                return uri.to_string();
323            }
324            let mut out = String::with_capacity(uri.len());
325            out.push_str(&scheme.to_ascii_lowercase());
326            out.push_str(&uri[i..]);
327            out
328        }
329        None => uri.to_string(),
330    }
331}
332
333fn enforce_query_param_limit(url: &Url, limits: &ConnStringLimits) -> Result<(), ParseError> {
334    let Some(q) = url.query() else {
335        return Ok(());
336    };
337    if q.is_empty() {
338        return Ok(());
339    }
340    let count = q.split('&').count();
341    if count > limits.max_query_params {
342        return Err(ParseError::new(
343            ParseErrorKind::LimitExceeded,
344            format!(
345                "max_query_params exceeded: limit={} actual={}",
346                limits.max_query_params, count,
347            ),
348        ));
349    }
350    Ok(())
351}
352
353/// Try to parse a multi-host gRPC URI. `Ok(None)` means "this is a
354/// single-host URI — fall through to the standard parser".
355fn try_parse_grpc_cluster(
356    uri: &str,
357    limits: &ConnStringLimits,
358) -> Result<Option<ConnectionTarget>, ParseError> {
359    let (rest, default_port) = if let Some(r) = uri.strip_prefix("grpc://") {
360        (r, DEFAULT_PORT_GRPC)
361    } else if let Some(r) = uri.strip_prefix("grpcs://") {
362        (r, DEFAULT_PORT_GRPCS)
363    } else if let Some(r) = uri
364        .strip_prefix("red://")
365        .or_else(|| uri.strip_prefix("reds://"))
366    {
367        (r, DEFAULT_PORT_RED)
368    } else {
369        return Ok(None);
370    };
371
372    let (host_part, query_part) = match rest.find('?') {
373        Some(i) => (&rest[..i], Some(&rest[i + 1..])),
374        None => (rest, None),
375    };
376
377    if !host_part.contains(',') {
378        return Ok(None);
379    }
380
381    let raw_count = host_part.split(',').count();
382    if raw_count > limits.max_cluster_hosts {
383        return Err(ParseError::new(
384            ParseErrorKind::LimitExceeded,
385            format!(
386                "max_cluster_hosts exceeded: limit={} actual={}",
387                limits.max_cluster_hosts, raw_count,
388            ),
389        ));
390    }
391
392    let mut endpoints: Vec<String> = Vec::with_capacity(raw_count);
393    for raw in host_part.split(',') {
394        let raw = raw.trim();
395        if raw.is_empty() {
396            return Err(ParseError::new(
397                ParseErrorKind::InvalidUri,
398                "grpc cluster URI has an empty host entry",
399            ));
400        }
401        // Bracketed IPv6 literal: `[::1]:5050` or `[::1]`.
402        let (host, port) = if let Some(after_bracket) = raw.strip_prefix('[') {
403            let end = after_bracket.find(']').ok_or_else(|| {
404                ParseError::new(
405                    ParseErrorKind::InvalidUri,
406                    format!("unterminated IPv6 bracket in cluster URI: {raw}"),
407                )
408            })?;
409            let host = &after_bracket[..end];
410            let tail = &after_bracket[end + 1..];
411            let port = if tail.is_empty() {
412                default_port
413            } else if let Some(p) = tail.strip_prefix(':') {
414                p.parse::<u16>().map_err(|_| {
415                    ParseError::new(
416                        ParseErrorKind::InvalidUri,
417                        format!("invalid port in cluster URI: {raw}"),
418                    )
419                })?
420            } else {
421                return Err(ParseError::new(
422                    ParseErrorKind::InvalidUri,
423                    format!("trailing junk after IPv6 bracket in cluster URI: {raw}"),
424                ));
425            };
426            (format!("[{host}]"), port)
427        } else {
428            match raw.rsplit_once(':') {
429                Some((h, p)) => {
430                    let port: u16 = p.parse().map_err(|_| {
431                        ParseError::new(
432                            ParseErrorKind::InvalidUri,
433                            format!("invalid port in cluster URI: {raw}"),
434                        )
435                    })?;
436                    (h.to_string(), port)
437                }
438                None => (raw.to_string(), default_port),
439            }
440        };
441        if host.is_empty() || host == "[]" {
442            return Err(ParseError::new(
443                ParseErrorKind::InvalidUri,
444                "grpc cluster URI has an empty host entry",
445            ));
446        }
447        endpoints.push(format!("http://{host}:{port}"));
448    }
449
450    if let Some(q) = query_part {
451        let qcount = if q.is_empty() {
452            0
453        } else {
454            q.split('&').count()
455        };
456        if qcount > limits.max_query_params {
457            return Err(ParseError::new(
458                ParseErrorKind::LimitExceeded,
459                format!(
460                    "max_query_params exceeded: limit={} actual={}",
461                    limits.max_query_params, qcount,
462                ),
463            ));
464        }
465    }
466
467    let force_primary = query_part
468        .map(|q| {
469            q.split('&').any(|kv| {
470                let mut parts = kv.splitn(2, '=');
471                let k = parts.next().unwrap_or("");
472                let v = parts.next().unwrap_or("");
473                k.eq_ignore_ascii_case("route") && v.eq_ignore_ascii_case("primary")
474            })
475        })
476        .unwrap_or(false);
477
478    let mut iter = endpoints.into_iter();
479    let primary = iter.next().expect("split on ',' yields at least one entry");
480    let replicas: Vec<String> = iter.collect();
481
482    Ok(Some(ConnectionTarget::GrpcCluster {
483        primary,
484        replicas,
485        force_primary,
486    }))
487}