1use std::{path::Path, sync::Arc};
2
3use argon2::{Argon2, PasswordHash, PasswordVerifier};
4use axum::{
5 Json,
6 body::Body,
7 extract::State,
8 http::{Request, StatusCode, header},
9 middleware::Next,
10 response::{IntoResponse, Response},
11};
12use detritus_protocol::schema::{SchemaError, SchemaKind};
13use serde::Deserialize;
14use serde_json::json;
15use subtle::ConstantTimeEq;
16use tokio::fs;
17
18use crate::{
19 metrics::Metrics,
20 rate_limit::RateLimitConfig,
21 schemas::{ProjectSchemaEntry, SchemaRegistry},
22 storage::SourceKey,
23};
24
25#[derive(Debug, Clone)]
26pub(crate) struct TokenContext {
27 pub(crate) id: String,
28 pub(crate) project: String,
29 pub(crate) source_prefix: String,
30 secret_hash: String,
31}
32
33impl TokenContext {
34 pub(crate) fn permits(&self, source: &SourceKey) -> bool {
35 source
36 .project
37 .as_bytes()
38 .ct_eq(self.project.as_bytes())
39 .into()
40 && source.canonical().starts_with(&self.source_prefix)
41 }
42
43 fn verify(&self, presented: &str) -> bool {
44 let Ok(hash) = PasswordHash::new(&self.secret_hash) else {
45 return false;
46 };
47 Argon2::default()
48 .verify_password(presented.as_bytes(), &hash)
49 .is_ok()
50 }
51}
52
53#[derive(Debug, Clone)]
55pub struct TokenStore {
56 tokens: Arc<Vec<TokenContext>>,
57}
58
59impl TokenStore {
60 pub async fn load(path: &Path) -> Result<Self, AuthConfigError> {
62 let raw = fs::read_to_string(path).await?;
63 let config: TokensConfig = toml::from_str(&raw)?;
64 let mut tokens = Vec::with_capacity(config.token.len());
65 for token in config.token {
66 PasswordHash::new(&token.secret).map_err(|error| AuthConfigError::InvalidHash {
67 id: token.id.clone(),
68 message: error.to_string(),
69 })?;
70 tokens.push(TokenContext {
71 id: token.id,
72 secret_hash: token.secret,
73 project: token.project,
74 source_prefix: token.source_prefix,
75 });
76 }
77 if tokens.is_empty() {
78 return Err(AuthConfigError::NoTokens);
79 }
80 Ok(Self {
81 tokens: Arc::new(tokens),
82 })
83 }
84
85 pub fn for_tests(tokens: Vec<TestToken>) -> Self {
87 Self {
88 tokens: Arc::new(
89 tokens
90 .into_iter()
91 .map(|token| TokenContext {
92 id: token.id,
93 secret_hash: token.secret_hash,
94 project: token.project,
95 source_prefix: token.source_prefix,
96 })
97 .collect(),
98 ),
99 }
100 }
101
102 pub(crate) fn authenticate(&self, presented: &str) -> Option<TokenContext> {
103 self.tokens
104 .iter()
105 .find(|token| token.verify(presented))
106 .cloned()
107 }
108}
109
110#[derive(Debug, Clone)]
112pub struct SecurityConfig {
113 pub token_store: TokenStore,
115 pub rate_limit: RateLimitConfig,
117 pub schema_registry: SchemaRegistry,
119}
120
121pub async fn load_security_config(path: &Path) -> Result<SecurityConfig, AuthConfigError> {
128 let raw = fs::read_to_string(path).await?;
129 let config: TokensConfig = toml::from_str(&raw)?;
130
131 let known_projects: std::collections::HashSet<&str> =
134 config.token.iter().map(|t| t.project.as_str()).collect();
135
136 let config_dir = path.parent().unwrap_or_else(|| Path::new("."));
139
140 let mut schema_entries: Vec<ProjectSchemaEntry> = Vec::new();
142 for entry in &config.schema {
143 if !known_projects.contains(entry.project.as_str()) {
144 return Err(AuthConfigError::SchemaProjectMismatch {
145 project: entry.project.clone(),
146 kind: entry.kind,
147 });
148 }
149 schema_entries.push(ProjectSchemaEntry {
150 project: entry.project.clone(),
151 kind: entry.kind,
152 path: config_dir.join(&entry.path),
154 });
155 }
156
157 let token_store = TokenStore::from_entries(config.token)?;
158 let schema_registry = SchemaRegistry::load(&schema_entries).await?;
159 Ok(SecurityConfig {
160 token_store,
161 rate_limit: config.rate_limit.unwrap_or_default(),
162 schema_registry,
163 })
164}
165
166#[derive(Debug, Clone)]
168pub struct TestToken {
169 pub id: String,
171 pub secret_hash: String,
173 pub project: String,
175 pub source_prefix: String,
177}
178
179#[derive(Debug, Deserialize)]
180struct TokensConfig {
181 #[serde(default)]
182 token: Vec<TokenEntry>,
183 rate_limit: Option<RateLimitConfig>,
184 #[serde(default)]
188 schema: Vec<SchemaEntry>,
189}
190
191#[derive(Debug, Deserialize)]
192struct TokenEntry {
193 id: String,
194 secret: String,
195 project: String,
196 source_prefix: String,
197}
198
199#[derive(Debug, Deserialize)]
201struct SchemaEntry {
202 project: String,
204 kind: SchemaKind,
206 path: std::path::PathBuf,
208}
209
210#[derive(Debug, thiserror::Error)]
212pub enum AuthConfigError {
213 #[error("token config I/O error: {0}")]
215 Io(#[from] std::io::Error),
216 #[error("token config TOML error: {0}")]
218 Toml(#[from] toml::de::Error),
219 #[error("token config contains no tokens")]
221 NoTokens,
222 #[error("token `{id}` has an invalid Argon2 hash: {message}")]
224 InvalidHash {
225 id: String,
227 message: String,
229 },
230 #[error(
232 "schema entry for project `{project}` / kind `{kind:?}` does not match any token project"
233 )]
234 SchemaProjectMismatch {
235 project: String,
237 kind: SchemaKind,
239 },
240 #[error("schema error: {0}")]
242 Schema(#[from] SchemaError),
243}
244
245impl TokenStore {
246 fn from_entries(entries: Vec<TokenEntry>) -> Result<Self, AuthConfigError> {
247 let mut tokens = Vec::with_capacity(entries.len());
248 for token in entries {
249 PasswordHash::new(&token.secret).map_err(|error| AuthConfigError::InvalidHash {
250 id: token.id.clone(),
251 message: error.to_string(),
252 })?;
253 tokens.push(TokenContext {
254 id: token.id,
255 secret_hash: token.secret,
256 project: token.project,
257 source_prefix: token.source_prefix,
258 });
259 }
260 if tokens.is_empty() {
261 return Err(AuthConfigError::NoTokens);
262 }
263 Ok(Self {
264 tokens: Arc::new(tokens),
265 })
266 }
267}
268
269#[derive(Clone)]
270pub(crate) struct AuthState {
271 pub token_store: TokenStore,
272 pub metrics: Metrics,
273}
274
275pub(crate) async fn auth_middleware(
276 State(state): State<AuthState>,
277 mut request: Request<Body>,
278 next: Next,
279) -> Response {
280 let path = request.uri().path();
281 if path == "/healthz" || path == "/metrics" {
282 return next.run(request).await;
283 }
284
285 let endpoint = endpoint_label(path);
286 let started = std::time::Instant::now();
287 let Some(header_value) = request.headers().get(header::AUTHORIZATION) else {
288 state
289 .metrics
290 .observe_request(endpoint, "401", started.elapsed());
291 return auth_error(StatusCode::UNAUTHORIZED, "missing bearer token");
292 };
293 let Ok(header_value) = header_value.to_str() else {
294 state
295 .metrics
296 .observe_request(endpoint, "401", started.elapsed());
297 return auth_error(
298 StatusCode::UNAUTHORIZED,
299 "authorization header is not UTF-8",
300 );
301 };
302 let Some(token) = header_value.strip_prefix("Bearer ") else {
303 state
304 .metrics
305 .observe_request(endpoint, "401", started.elapsed());
306 return auth_error(
307 StatusCode::UNAUTHORIZED,
308 "authorization header must use Bearer",
309 );
310 };
311 let Some(context) = state.token_store.authenticate(token) else {
312 state
313 .metrics
314 .observe_request(endpoint, "401", started.elapsed());
315 return auth_error(StatusCode::UNAUTHORIZED, "invalid bearer token");
316 };
317
318 request.extensions_mut().insert(context);
319 next.run(request).await
320}
321
322#[allow(clippy::result_large_err)]
323pub(crate) fn token_from_extensions(
324 extensions: &axum::http::Extensions,
325) -> Result<TokenContext, tonic::Status> {
326 extensions
327 .get::<TokenContext>()
328 .cloned()
329 .ok_or_else(|| tonic::Status::unauthenticated("missing token context"))
330}
331
332pub(crate) fn endpoint_label(path: &str) -> &'static str {
333 if path == "/v1/crashes" {
334 "crashes"
335 } else if path == "/opentelemetry.proto.collector.logs.v1.LogsService/Export" {
336 "logs"
337 } else if path == "/metrics" {
338 "metrics"
339 } else if path == "/healthz" {
340 "healthz"
341 } else {
342 "other"
343 }
344}
345
346fn auth_error(status: StatusCode, message: &str) -> Response {
347 (
348 status,
349 Json(json!({
350 "error": {
351 "code": status.as_u16(),
352 "message": message,
353 }
354 })),
355 )
356 .into_response()
357}