statespace_tool_runtime/
security.rs1use crate::error::Error;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
7
8pub fn validate_url_initial(url: &str) -> Result<reqwest::Url, Error> {
12 let parsed =
13 reqwest::Url::parse(url).map_err(|e| Error::InvalidCommand(format!("Invalid URL: {e}")))?;
14
15 if parsed.scheme() != "http" && parsed.scheme() != "https" {
16 return Err(Error::Security(format!(
17 "Only http/https schemes allowed, got: {}",
18 parsed.scheme()
19 )));
20 }
21
22 let host = parsed
23 .host_str()
24 .ok_or_else(|| Error::InvalidCommand("URL must have a host".into()))?;
25
26 if is_localhost_name(host) {
27 return Err(Error::Security(format!(
28 "Access to localhost is not allowed: {host}"
29 )));
30 }
31
32 if is_metadata_service(host) {
33 return Err(Error::Security(format!(
34 "Access to metadata service blocked: {host}"
35 )));
36 }
37
38 if let Ok(ip) = host.parse::<IpAddr>() {
39 if is_private_or_restricted_ip(&ip) {
40 return Err(Error::Security(format!(
41 "Access to private/restricted IP blocked: {ip}"
42 )));
43 }
44 }
45
46 Ok(parsed)
47}
48
49fn is_localhost_name(host: &str) -> bool {
50 matches!(
51 host.to_lowercase().as_str(),
52 "localhost" | "localhost.localdomain"
53 )
54}
55
56fn is_metadata_service(host: &str) -> bool {
57 host == "169.254.169.254" || host == "metadata.google.internal"
58}
59
60#[must_use]
61pub fn is_private_or_restricted_ip(ip: &IpAddr) -> bool {
62 match ip {
63 IpAddr::V4(ipv4) => is_private_ipv4(*ipv4),
64 IpAddr::V6(ipv6) => is_private_ipv6(ipv6),
65 }
66}
67
68const fn is_private_ipv4(ip: Ipv4Addr) -> bool {
69 ip.is_private()
70 || ip.is_loopback()
71 || ip.is_link_local()
72 || ip.is_broadcast()
73 || ip.is_documentation()
74 || ip.is_unspecified()
75}
76
77fn is_private_ipv6(ip: &Ipv6Addr) -> bool {
78 ip.is_loopback()
79 || ip.is_unspecified()
80 || ip.is_unique_local()
81 || ip.is_unicast_link_local()
82 || ip.is_multicast()
83 || is_ipv6_site_local(ip)
84 || is_ipv4_mapped_private(ip)
85}
86
87fn is_ipv6_site_local(ip: &Ipv6Addr) -> bool {
88 let s0 = ip.segments()[0];
89 (0xfec0..=0xfeff).contains(&s0)
90}
91
92const fn is_ipv4_mapped_private(ip: &Ipv6Addr) -> bool {
93 if let Some(mapped) = ip.to_ipv4_mapped() {
94 is_private_ipv4(mapped)
95 } else {
96 false
97 }
98}
99
100#[cfg(test)]
101#[allow(clippy::unwrap_used)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn test_validate_url_allows_https() {
107 assert!(validate_url_initial("https://example.com").is_ok());
108 assert!(validate_url_initial("https://api.github.com/repos").is_ok());
109 }
110
111 #[test]
112 fn test_validate_url_allows_http() {
113 assert!(validate_url_initial("http://example.com").is_ok());
114 }
115
116 #[test]
117 fn test_validate_url_blocks_ftp() {
118 let result = validate_url_initial("ftp://example.com");
119 assert!(matches!(result, Err(Error::Security(_))));
120 }
121
122 #[test]
123 fn test_validate_url_blocks_file() {
124 let result = validate_url_initial("file:///etc/passwd");
125 assert!(matches!(result, Err(Error::Security(_))));
126 }
127
128 #[test]
129 fn test_validate_url_blocks_localhost() {
130 assert!(matches!(
131 validate_url_initial("http://localhost"),
132 Err(Error::Security(_))
133 ));
134 assert!(matches!(
135 validate_url_initial("https://localhost:8080"),
136 Err(Error::Security(_))
137 ));
138 }
139
140 #[test]
141 fn test_validate_url_blocks_metadata_service() {
142 assert!(matches!(
143 validate_url_initial("http://169.254.169.254"),
144 Err(Error::Security(_))
145 ));
146 assert!(matches!(
147 validate_url_initial("http://metadata.google.internal"),
148 Err(Error::Security(_))
149 ));
150 }
151
152 #[test]
153 fn test_ipv4_blocks_private() {
154 assert!(is_private_ipv4("10.0.0.1".parse().unwrap()));
155 assert!(is_private_ipv4("172.16.0.1".parse().unwrap()));
156 assert!(is_private_ipv4("192.168.1.1".parse().unwrap()));
157 assert!(is_private_ipv4("127.0.0.1".parse().unwrap()));
158 }
159
160 #[test]
161 fn test_ipv4_allows_public() {
162 assert!(!is_private_ipv4("1.1.1.1".parse().unwrap()));
163 assert!(!is_private_ipv4("8.8.8.8".parse().unwrap()));
164 }
165
166 #[test]
167 fn test_ipv6_blocks_loopback() {
168 assert!(is_private_ipv6(&"::1".parse().unwrap()));
169 }
170
171 #[test]
172 fn test_ipv6_blocks_unique_local() {
173 assert!(is_private_ipv6(&"fc00::1".parse().unwrap()));
174 assert!(is_private_ipv6(&"fd00::1".parse().unwrap()));
175 }
176}