1use std::path::PathBuf;
18
19use url::Url;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum ParseErrorKind {
27 Empty,
29 InvalidUri,
33 UnsupportedScheme,
35 LimitExceeded,
39}
40
41impl ParseErrorKind {
42 pub fn as_str(self) -> &'static str {
43 match self {
44 ParseErrorKind::Empty => "EMPTY",
45 ParseErrorKind::InvalidUri => "INVALID_URI",
46 ParseErrorKind::UnsupportedScheme => "UNSUPPORTED_SCHEME",
47 ParseErrorKind::LimitExceeded => "LIMIT_EXCEEDED",
48 }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct ParseError {
55 pub kind: ParseErrorKind,
56 pub message: String,
57}
58
59impl ParseError {
60 pub fn new(kind: ParseErrorKind, message: impl Into<String>) -> Self {
61 Self {
62 kind,
63 message: message.into(),
64 }
65 }
66}
67
68impl std::fmt::Display for ParseError {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "{}: {}", self.kind.as_str(), self.message)
71 }
72}
73
74impl std::error::Error for ParseError {}
75
76pub const DEFAULT_PORT_RED: u16 = 5050;
79pub const DEFAULT_PORT_GRPC: u16 = 55055;
80pub const DEFAULT_PORT_GRPCS: u16 = 55555;
81pub const DEFAULT_PORT_WS: u16 = 80;
85pub const DEFAULT_PORT_WSS: u16 = 443;
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub struct ConnStringLimits {
95 pub max_uri_bytes: usize,
97 pub max_query_params: usize,
99 pub max_cluster_hosts: usize,
102}
103
104impl Default for ConnStringLimits {
105 fn default() -> Self {
106 Self {
107 max_uri_bytes: 8 * 1024,
108 max_query_params: 32,
109 max_cluster_hosts: 64,
110 }
111 }
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum ConnectionTarget {
121 Memory,
123 File { path: PathBuf },
125 Grpc { endpoint: String },
129 GrpcCluster {
133 primary: String,
134 replicas: Vec<String>,
135 force_primary: bool,
136 },
137 Http { base_url: String },
139 RedWire { host: String, port: u16, tls: bool },
144 WsNative { host: String, port: u16, tls: bool },
148}
149
150pub fn parse(uri: &str) -> Result<ConnectionTarget, ParseError> {
161 parse_with_limits(uri, ConnStringLimits::default())
162}
163
164pub fn is_embedded_connection_uri(uri: &str) -> bool {
171 let trimmed = uri.trim();
172 matches!(
173 trimmed,
174 "red://" | "red:" | "red:///" | "red://:memory" | "red://:memory:"
175 ) || trimmed.starts_with("red:///")
176}
177
178pub fn parse_with_limits(
183 uri: &str,
184 limits: ConnStringLimits,
185) -> Result<ConnectionTarget, ParseError> {
186 if uri.is_empty() {
187 return Err(ParseError::new(
188 ParseErrorKind::Empty,
189 "empty connection string",
190 ));
191 }
192
193 if uri.len() > limits.max_uri_bytes {
194 return Err(ParseError::new(
195 ParseErrorKind::LimitExceeded,
196 format!(
197 "max_uri_bytes exceeded: limit={} actual={}",
198 limits.max_uri_bytes,
199 uri.len(),
200 ),
201 ));
202 }
203
204 let normalised = normalise_scheme(uri);
209 let uri = normalised.as_str();
210
211 if uri == "memory://" || uri == "memory:" {
212 return Ok(ConnectionTarget::Memory);
213 }
214
215 if let Some(rest) = uri.strip_prefix("file://") {
216 if rest.is_empty() {
217 return Err(ParseError::new(
218 ParseErrorKind::InvalidUri,
219 "file:// URI is missing a path",
220 ));
221 }
222 return Ok(ConnectionTarget::File {
223 path: PathBuf::from(rest),
224 });
225 }
226
227 if let Some(cluster) = try_parse_grpc_cluster(uri, &limits)? {
228 return Ok(cluster);
229 }
230
231 let parsed = Url::parse(uri)
232 .map_err(|e| ParseError::new(ParseErrorKind::InvalidUri, format!("{e}: {uri}")))?;
233
234 enforce_query_param_limit(&parsed, &limits)?;
235
236 match parsed.scheme() {
237 "red" | "reds" => {
238 let host = parsed.host_str().ok_or_else(|| {
239 ParseError::new(ParseErrorKind::InvalidUri, "red:// URI is missing a host")
240 })?;
241 let port = parsed.port().unwrap_or(DEFAULT_PORT_RED);
242 Ok(ConnectionTarget::RedWire {
243 host: host.to_string(),
244 port,
245 tls: parsed.scheme() == "reds",
246 })
247 }
248 "red+ws" | "red+wss" => {
249 let host = parsed.host_str().ok_or_else(|| {
250 ParseError::new(
251 ParseErrorKind::InvalidUri,
252 "red+ws(s):// URI is missing a host",
253 )
254 })?;
255 let tls = parsed.scheme() == "red+wss";
256 let port = parsed.port().unwrap_or(if tls {
257 DEFAULT_PORT_WSS
258 } else {
259 DEFAULT_PORT_WS
260 });
261 Ok(ConnectionTarget::WsNative {
262 host: host.to_string(),
263 port,
264 tls,
265 })
266 }
267 "grpc" | "grpcs" => {
268 let host = parsed.host_str().ok_or_else(|| {
269 ParseError::new(ParseErrorKind::InvalidUri, "grpc:// URI is missing a host")
270 })?;
271 let port = parsed.port().unwrap_or_else(|| {
272 if parsed.scheme() == "grpcs" {
273 DEFAULT_PORT_GRPCS
274 } else {
275 DEFAULT_PORT_GRPC
276 }
277 });
278 Ok(ConnectionTarget::Grpc {
279 endpoint: format!("http://{host}:{port}"),
280 })
281 }
282 "http" | "https" => {
283 let host = parsed.host_str().ok_or_else(|| {
284 ParseError::new(
285 ParseErrorKind::InvalidUri,
286 "http(s):// URI is missing a host",
287 )
288 })?;
289 let scheme = parsed.scheme();
290 let port = parsed
291 .port()
292 .unwrap_or(if scheme == "https" { 443 } else { 80 });
293 Ok(ConnectionTarget::Http {
294 base_url: format!("{scheme}://{host}:{port}"),
295 })
296 }
297 other => Err(ParseError::new(
298 ParseErrorKind::UnsupportedScheme,
299 format!("unsupported scheme: {other}"),
300 )),
301 }
302}
303
304fn normalise_scheme(uri: &str) -> String {
310 match uri.find(':') {
311 Some(i) => {
312 let scheme = &uri[..i];
313 if scheme.is_empty()
318 || !scheme
319 .bytes()
320 .all(|b| b.is_ascii_alphanumeric() || b == b'+' || b == b'.' || b == b'-')
321 {
322 return uri.to_string();
323 }
324 let mut out = String::with_capacity(uri.len());
325 out.push_str(&scheme.to_ascii_lowercase());
326 out.push_str(&uri[i..]);
327 out
328 }
329 None => uri.to_string(),
330 }
331}
332
333fn enforce_query_param_limit(url: &Url, limits: &ConnStringLimits) -> Result<(), ParseError> {
334 let Some(q) = url.query() else {
335 return Ok(());
336 };
337 if q.is_empty() {
338 return Ok(());
339 }
340 let count = q.split('&').count();
341 if count > limits.max_query_params {
342 return Err(ParseError::new(
343 ParseErrorKind::LimitExceeded,
344 format!(
345 "max_query_params exceeded: limit={} actual={}",
346 limits.max_query_params, count,
347 ),
348 ));
349 }
350 Ok(())
351}
352
353fn try_parse_grpc_cluster(
356 uri: &str,
357 limits: &ConnStringLimits,
358) -> Result<Option<ConnectionTarget>, ParseError> {
359 let (rest, default_port) = if let Some(r) = uri.strip_prefix("grpc://") {
360 (r, DEFAULT_PORT_GRPC)
361 } else if let Some(r) = uri.strip_prefix("grpcs://") {
362 (r, DEFAULT_PORT_GRPCS)
363 } else if let Some(r) = uri
364 .strip_prefix("red://")
365 .or_else(|| uri.strip_prefix("reds://"))
366 {
367 (r, DEFAULT_PORT_RED)
368 } else {
369 return Ok(None);
370 };
371
372 let (host_part, query_part) = match rest.find('?') {
373 Some(i) => (&rest[..i], Some(&rest[i + 1..])),
374 None => (rest, None),
375 };
376
377 if !host_part.contains(',') {
378 return Ok(None);
379 }
380
381 let raw_count = host_part.split(',').count();
382 if raw_count > limits.max_cluster_hosts {
383 return Err(ParseError::new(
384 ParseErrorKind::LimitExceeded,
385 format!(
386 "max_cluster_hosts exceeded: limit={} actual={}",
387 limits.max_cluster_hosts, raw_count,
388 ),
389 ));
390 }
391
392 let mut endpoints: Vec<String> = Vec::with_capacity(raw_count);
393 for raw in host_part.split(',') {
394 let raw = raw.trim();
395 if raw.is_empty() {
396 return Err(ParseError::new(
397 ParseErrorKind::InvalidUri,
398 "grpc cluster URI has an empty host entry",
399 ));
400 }
401 let (host, port) = if let Some(after_bracket) = raw.strip_prefix('[') {
403 let end = after_bracket.find(']').ok_or_else(|| {
404 ParseError::new(
405 ParseErrorKind::InvalidUri,
406 format!("unterminated IPv6 bracket in cluster URI: {raw}"),
407 )
408 })?;
409 let host = &after_bracket[..end];
410 let tail = &after_bracket[end + 1..];
411 let port = if tail.is_empty() {
412 default_port
413 } else if let Some(p) = tail.strip_prefix(':') {
414 p.parse::<u16>().map_err(|_| {
415 ParseError::new(
416 ParseErrorKind::InvalidUri,
417 format!("invalid port in cluster URI: {raw}"),
418 )
419 })?
420 } else {
421 return Err(ParseError::new(
422 ParseErrorKind::InvalidUri,
423 format!("trailing junk after IPv6 bracket in cluster URI: {raw}"),
424 ));
425 };
426 (format!("[{host}]"), port)
427 } else {
428 match raw.rsplit_once(':') {
429 Some((h, p)) => {
430 let port: u16 = p.parse().map_err(|_| {
431 ParseError::new(
432 ParseErrorKind::InvalidUri,
433 format!("invalid port in cluster URI: {raw}"),
434 )
435 })?;
436 (h.to_string(), port)
437 }
438 None => (raw.to_string(), default_port),
439 }
440 };
441 if host.is_empty() || host == "[]" {
442 return Err(ParseError::new(
443 ParseErrorKind::InvalidUri,
444 "grpc cluster URI has an empty host entry",
445 ));
446 }
447 endpoints.push(format!("http://{host}:{port}"));
448 }
449
450 if let Some(q) = query_part {
451 let qcount = if q.is_empty() {
452 0
453 } else {
454 q.split('&').count()
455 };
456 if qcount > limits.max_query_params {
457 return Err(ParseError::new(
458 ParseErrorKind::LimitExceeded,
459 format!(
460 "max_query_params exceeded: limit={} actual={}",
461 limits.max_query_params, qcount,
462 ),
463 ));
464 }
465 }
466
467 let force_primary = query_part
468 .map(|q| {
469 q.split('&').any(|kv| {
470 let mut parts = kv.splitn(2, '=');
471 let k = parts.next().unwrap_or("");
472 let v = parts.next().unwrap_or("");
473 k.eq_ignore_ascii_case("route") && v.eq_ignore_ascii_case("primary")
474 })
475 })
476 .unwrap_or(false);
477
478 let mut iter = endpoints.into_iter();
479 let primary = iter.next().expect("split on ',' yields at least one entry");
480 let replicas: Vec<String> = iter.collect();
481
482 Ok(Some(ConnectionTarget::GrpcCluster {
483 primary,
484 replicas,
485 force_primary,
486 }))
487}