1use std::path::PathBuf;
18
19use url::Url;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum ParseErrorKind {
27 Empty,
29 InvalidUri,
33 UnsupportedScheme,
35 LimitExceeded,
39}
40
41impl ParseErrorKind {
42 pub fn as_str(self) -> &'static str {
43 match self {
44 ParseErrorKind::Empty => "EMPTY",
45 ParseErrorKind::InvalidUri => "INVALID_URI",
46 ParseErrorKind::UnsupportedScheme => "UNSUPPORTED_SCHEME",
47 ParseErrorKind::LimitExceeded => "LIMIT_EXCEEDED",
48 }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct ParseError {
55 pub kind: ParseErrorKind,
56 pub message: String,
57}
58
59impl ParseError {
60 pub fn new(kind: ParseErrorKind, message: impl Into<String>) -> Self {
61 Self {
62 kind,
63 message: message.into(),
64 }
65 }
66}
67
68impl std::fmt::Display for ParseError {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "{}: {}", self.kind.as_str(), self.message)
71 }
72}
73
74impl std::error::Error for ParseError {}
75
76pub const DEFAULT_PORT_RED: u16 = 5050;
79pub const DEFAULT_PORT_GRPC: u16 = 5055;
80pub const DEFAULT_PORT_WS: u16 = 80;
84pub const DEFAULT_PORT_WSS: u16 = 443;
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub struct ConnStringLimits {
94 pub max_uri_bytes: usize,
96 pub max_query_params: usize,
98 pub max_cluster_hosts: usize,
101}
102
103impl Default for ConnStringLimits {
104 fn default() -> Self {
105 Self {
106 max_uri_bytes: 8 * 1024,
107 max_query_params: 32,
108 max_cluster_hosts: 64,
109 }
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
119pub enum ConnectionTarget {
120 Memory,
122 File { path: PathBuf },
124 Grpc { endpoint: String },
128 GrpcCluster {
132 primary: String,
133 replicas: Vec<String>,
134 force_primary: bool,
135 },
136 Http { base_url: String },
138 RedWire { host: String, port: u16, tls: bool },
143 WsNative { host: String, port: u16, tls: bool },
147}
148
149pub fn parse(uri: &str) -> Result<ConnectionTarget, ParseError> {
160 parse_with_limits(uri, ConnStringLimits::default())
161}
162
163pub fn is_embedded_connection_uri(uri: &str) -> bool {
170 let trimmed = uri.trim();
171 matches!(
172 trimmed,
173 "red://" | "red:" | "red:///" | "red://:memory" | "red://:memory:"
174 ) || trimmed.starts_with("red:///")
175}
176
177pub fn parse_with_limits(
182 uri: &str,
183 limits: ConnStringLimits,
184) -> Result<ConnectionTarget, ParseError> {
185 if uri.is_empty() {
186 return Err(ParseError::new(
187 ParseErrorKind::Empty,
188 "empty connection string",
189 ));
190 }
191
192 if uri.len() > limits.max_uri_bytes {
193 return Err(ParseError::new(
194 ParseErrorKind::LimitExceeded,
195 format!(
196 "max_uri_bytes exceeded: limit={} actual={}",
197 limits.max_uri_bytes,
198 uri.len(),
199 ),
200 ));
201 }
202
203 let normalised = normalise_scheme(uri);
208 let uri = normalised.as_str();
209
210 if uri == "memory://" || uri == "memory:" {
211 return Ok(ConnectionTarget::Memory);
212 }
213
214 if let Some(rest) = uri.strip_prefix("file://") {
215 if rest.is_empty() {
216 return Err(ParseError::new(
217 ParseErrorKind::InvalidUri,
218 "file:// URI is missing a path",
219 ));
220 }
221 return Ok(ConnectionTarget::File {
222 path: PathBuf::from(rest),
223 });
224 }
225
226 if let Some(cluster) = try_parse_grpc_cluster(uri, &limits)? {
227 return Ok(cluster);
228 }
229
230 let parsed = Url::parse(uri)
231 .map_err(|e| ParseError::new(ParseErrorKind::InvalidUri, format!("{e}: {uri}")))?;
232
233 enforce_query_param_limit(&parsed, &limits)?;
234
235 match parsed.scheme() {
236 "red" | "reds" => {
237 let host = parsed.host_str().ok_or_else(|| {
238 ParseError::new(ParseErrorKind::InvalidUri, "red:// URI is missing a host")
239 })?;
240 let port = parsed.port().unwrap_or(DEFAULT_PORT_RED);
241 Ok(ConnectionTarget::RedWire {
242 host: host.to_string(),
243 port,
244 tls: parsed.scheme() == "reds",
245 })
246 }
247 "red+ws" | "red+wss" => {
248 let host = parsed.host_str().ok_or_else(|| {
249 ParseError::new(
250 ParseErrorKind::InvalidUri,
251 "red+ws(s):// URI is missing a host",
252 )
253 })?;
254 let tls = parsed.scheme() == "red+wss";
255 let port = parsed.port().unwrap_or(if tls {
256 DEFAULT_PORT_WSS
257 } else {
258 DEFAULT_PORT_WS
259 });
260 Ok(ConnectionTarget::WsNative {
261 host: host.to_string(),
262 port,
263 tls,
264 })
265 }
266 "grpc" | "grpcs" => {
267 let host = parsed.host_str().ok_or_else(|| {
268 ParseError::new(ParseErrorKind::InvalidUri, "grpc:// URI is missing a host")
269 })?;
270 let port = parsed.port().unwrap_or(DEFAULT_PORT_GRPC);
271 Ok(ConnectionTarget::Grpc {
272 endpoint: format!("http://{host}:{port}"),
273 })
274 }
275 "http" | "https" => {
276 let host = parsed.host_str().ok_or_else(|| {
277 ParseError::new(
278 ParseErrorKind::InvalidUri,
279 "http(s):// URI is missing a host",
280 )
281 })?;
282 let scheme = parsed.scheme();
283 let port = parsed
284 .port()
285 .unwrap_or(if scheme == "https" { 443 } else { 80 });
286 Ok(ConnectionTarget::Http {
287 base_url: format!("{scheme}://{host}:{port}"),
288 })
289 }
290 other => Err(ParseError::new(
291 ParseErrorKind::UnsupportedScheme,
292 format!("unsupported scheme: {other}"),
293 )),
294 }
295}
296
297fn normalise_scheme(uri: &str) -> String {
303 match uri.find(':') {
304 Some(i) => {
305 let scheme = &uri[..i];
306 if scheme.is_empty()
311 || !scheme
312 .bytes()
313 .all(|b| b.is_ascii_alphanumeric() || b == b'+' || b == b'.' || b == b'-')
314 {
315 return uri.to_string();
316 }
317 let mut out = String::with_capacity(uri.len());
318 out.push_str(&scheme.to_ascii_lowercase());
319 out.push_str(&uri[i..]);
320 out
321 }
322 None => uri.to_string(),
323 }
324}
325
326fn enforce_query_param_limit(url: &Url, limits: &ConnStringLimits) -> Result<(), ParseError> {
327 let Some(q) = url.query() else {
328 return Ok(());
329 };
330 if q.is_empty() {
331 return Ok(());
332 }
333 let count = q.split('&').count();
334 if count > limits.max_query_params {
335 return Err(ParseError::new(
336 ParseErrorKind::LimitExceeded,
337 format!(
338 "max_query_params exceeded: limit={} actual={}",
339 limits.max_query_params, count,
340 ),
341 ));
342 }
343 Ok(())
344}
345
346fn try_parse_grpc_cluster(
349 uri: &str,
350 limits: &ConnStringLimits,
351) -> Result<Option<ConnectionTarget>, ParseError> {
352 let (rest, default_port) = if let Some(r) = uri.strip_prefix("grpc://") {
353 (r, DEFAULT_PORT_GRPC)
354 } else if let Some(r) = uri.strip_prefix("grpcs://") {
355 (r, DEFAULT_PORT_GRPC)
356 } else if let Some(r) = uri
357 .strip_prefix("red://")
358 .or_else(|| uri.strip_prefix("reds://"))
359 {
360 (r, DEFAULT_PORT_RED)
361 } else {
362 return Ok(None);
363 };
364
365 let (host_part, query_part) = match rest.find('?') {
366 Some(i) => (&rest[..i], Some(&rest[i + 1..])),
367 None => (rest, None),
368 };
369
370 if !host_part.contains(',') {
371 return Ok(None);
372 }
373
374 let raw_count = host_part.split(',').count();
375 if raw_count > limits.max_cluster_hosts {
376 return Err(ParseError::new(
377 ParseErrorKind::LimitExceeded,
378 format!(
379 "max_cluster_hosts exceeded: limit={} actual={}",
380 limits.max_cluster_hosts, raw_count,
381 ),
382 ));
383 }
384
385 let mut endpoints: Vec<String> = Vec::with_capacity(raw_count);
386 for raw in host_part.split(',') {
387 let raw = raw.trim();
388 if raw.is_empty() {
389 return Err(ParseError::new(
390 ParseErrorKind::InvalidUri,
391 "grpc cluster URI has an empty host entry",
392 ));
393 }
394 let (host, port) = if let Some(after_bracket) = raw.strip_prefix('[') {
396 let end = after_bracket.find(']').ok_or_else(|| {
397 ParseError::new(
398 ParseErrorKind::InvalidUri,
399 format!("unterminated IPv6 bracket in cluster URI: {raw}"),
400 )
401 })?;
402 let host = &after_bracket[..end];
403 let tail = &after_bracket[end + 1..];
404 let port = if tail.is_empty() {
405 default_port
406 } else if let Some(p) = tail.strip_prefix(':') {
407 p.parse::<u16>().map_err(|_| {
408 ParseError::new(
409 ParseErrorKind::InvalidUri,
410 format!("invalid port in cluster URI: {raw}"),
411 )
412 })?
413 } else {
414 return Err(ParseError::new(
415 ParseErrorKind::InvalidUri,
416 format!("trailing junk after IPv6 bracket in cluster URI: {raw}"),
417 ));
418 };
419 (format!("[{host}]"), port)
420 } else {
421 match raw.rsplit_once(':') {
422 Some((h, p)) => {
423 let port: u16 = p.parse().map_err(|_| {
424 ParseError::new(
425 ParseErrorKind::InvalidUri,
426 format!("invalid port in cluster URI: {raw}"),
427 )
428 })?;
429 (h.to_string(), port)
430 }
431 None => (raw.to_string(), default_port),
432 }
433 };
434 if host.is_empty() || host == "[]" {
435 return Err(ParseError::new(
436 ParseErrorKind::InvalidUri,
437 "grpc cluster URI has an empty host entry",
438 ));
439 }
440 endpoints.push(format!("http://{host}:{port}"));
441 }
442
443 if let Some(q) = query_part {
444 let qcount = if q.is_empty() {
445 0
446 } else {
447 q.split('&').count()
448 };
449 if qcount > limits.max_query_params {
450 return Err(ParseError::new(
451 ParseErrorKind::LimitExceeded,
452 format!(
453 "max_query_params exceeded: limit={} actual={}",
454 limits.max_query_params, qcount,
455 ),
456 ));
457 }
458 }
459
460 let force_primary = query_part
461 .map(|q| {
462 q.split('&').any(|kv| {
463 let mut parts = kv.splitn(2, '=');
464 let k = parts.next().unwrap_or("");
465 let v = parts.next().unwrap_or("");
466 k.eq_ignore_ascii_case("route") && v.eq_ignore_ascii_case("primary")
467 })
468 })
469 .unwrap_or(false);
470
471 let mut iter = endpoints.into_iter();
472 let primary = iter.next().expect("split on ',' yields at least one entry");
473 let replicas: Vec<String> = iter.collect();
474
475 Ok(Some(ConnectionTarget::GrpcCluster {
476 primary,
477 replicas,
478 force_primary,
479 }))
480}