Skip to main content

detritus_server/
auth.rs

1use std::{path::Path, sync::Arc};
2
3use argon2::{Argon2, PasswordHash, PasswordVerifier};
4use axum::{
5    Json,
6    body::Body,
7    extract::State,
8    http::{Request, StatusCode, header},
9    middleware::Next,
10    response::{IntoResponse, Response},
11};
12use detritus_protocol::schema::{SchemaError, SchemaKind};
13use serde::Deserialize;
14use serde_json::json;
15use subtle::ConstantTimeEq;
16use tokio::fs;
17
18use crate::{
19    metrics::Metrics,
20    rate_limit::RateLimitConfig,
21    schemas::{ProjectSchemaEntry, SchemaRegistry},
22    storage::SourceKey,
23};
24
25#[derive(Debug, Clone)]
26pub(crate) struct TokenContext {
27    pub(crate) id: String,
28    pub(crate) project: String,
29    pub(crate) source_prefix: String,
30    secret_hash: String,
31}
32
33impl TokenContext {
34    pub(crate) fn permits(&self, source: &SourceKey) -> bool {
35        source
36            .project
37            .as_bytes()
38            .ct_eq(self.project.as_bytes())
39            .into()
40            && source.canonical().starts_with(&self.source_prefix)
41    }
42
43    fn verify(&self, presented: &str) -> bool {
44        let Ok(hash) = PasswordHash::new(&self.secret_hash) else {
45            return false;
46        };
47        Argon2::default()
48            .verify_password(presented.as_bytes(), &hash)
49            .is_ok()
50    }
51}
52
53/// In-memory bearer-token store used by the server.
54#[derive(Debug, Clone)]
55pub struct TokenStore {
56    tokens: Arc<Vec<TokenContext>>,
57}
58
59impl TokenStore {
60    /// Loads a token store from a TOML token configuration file.
61    pub async fn load(path: &Path) -> Result<Self, AuthConfigError> {
62        let raw = fs::read_to_string(path).await?;
63        let config: TokensConfig = toml::from_str(&raw)?;
64        let mut tokens = Vec::with_capacity(config.token.len());
65        for token in config.token {
66            PasswordHash::new(&token.secret).map_err(|error| AuthConfigError::InvalidHash {
67                id: token.id.clone(),
68                message: error.to_string(),
69            })?;
70            tokens.push(TokenContext {
71                id: token.id,
72                secret_hash: token.secret,
73                project: token.project,
74                source_prefix: token.source_prefix,
75            });
76        }
77        if tokens.is_empty() {
78            return Err(AuthConfigError::NoTokens);
79        }
80        Ok(Self {
81            tokens: Arc::new(tokens),
82        })
83    }
84
85    /// Creates a token store from pre-hashed entries for tests and harnesses.
86    pub fn for_tests(tokens: Vec<TestToken>) -> Self {
87        Self {
88            tokens: Arc::new(
89                tokens
90                    .into_iter()
91                    .map(|token| TokenContext {
92                        id: token.id,
93                        secret_hash: token.secret_hash,
94                        project: token.project,
95                        source_prefix: token.source_prefix,
96                    })
97                    .collect(),
98            ),
99        }
100    }
101
102    pub(crate) fn authenticate(&self, presented: &str) -> Option<TokenContext> {
103        self.tokens
104            .iter()
105            .find(|token| token.verify(presented))
106            .cloned()
107    }
108}
109
110/// Security configuration loaded from the token configuration file.
111#[derive(Debug, Clone)]
112pub struct SecurityConfig {
113    /// Bearer-token store used for request authentication.
114    pub token_store: TokenStore,
115    /// Per-token and per-source rate limit configuration.
116    pub rate_limit: RateLimitConfig,
117    /// Per-tenant JSON Schema registry (no-op in Phase 01).
118    pub schema_registry: SchemaRegistry,
119}
120
121/// Loads token and rate-limit configuration from a TOML file.
122///
123/// Schema paths in `[[schema]]` entries are resolved relative to the tokens
124/// config file's parent directory, not relative to the current working
125/// directory.  This prevents deployment breakage when `systemd` (or similar)
126/// starts the daemon from `/`.
127pub async fn load_security_config(path: &Path) -> Result<SecurityConfig, AuthConfigError> {
128    let raw = fs::read_to_string(path).await?;
129    let config: TokensConfig = toml::from_str(&raw)?;
130
131    // Collect the set of project names declared in [[token]] entries so we can
132    // validate that every [[schema]] entry refers to a known project.
133    let known_projects: std::collections::HashSet<&str> =
134        config.token.iter().map(|t| t.project.as_str()).collect();
135
136    // Determine the directory containing the tokens file so schema paths can
137    // be resolved relative to it rather than relative to CWD.
138    let config_dir = path.parent().unwrap_or_else(|| Path::new("."));
139
140    // Build the list of resolved schema entries, checking project membership.
141    let mut schema_entries: Vec<ProjectSchemaEntry> = Vec::new();
142    for entry in &config.schema {
143        if !known_projects.contains(entry.project.as_str()) {
144            return Err(AuthConfigError::SchemaProjectMismatch {
145                project: entry.project.clone(),
146                kind: entry.kind,
147            });
148        }
149        schema_entries.push(ProjectSchemaEntry {
150            project: entry.project.clone(),
151            kind: entry.kind,
152            // Resolve the path relative to the tokens config's parent dir.
153            path: config_dir.join(&entry.path),
154        });
155    }
156
157    let token_store = TokenStore::from_entries(config.token)?;
158    let schema_registry = SchemaRegistry::load(&schema_entries).await?;
159    Ok(SecurityConfig {
160        token_store,
161        rate_limit: config.rate_limit.unwrap_or_default(),
162        schema_registry,
163    })
164}
165
166/// Pre-hashed token entry used by embedded tests and local harnesses.
167#[derive(Debug, Clone)]
168pub struct TestToken {
169    /// Token identifier used in logs and rate-limit keys.
170    pub id: String,
171    /// Argon2 encoded password hash for the bearer token.
172    pub secret_hash: String,
173    /// Project this token may write.
174    pub project: String,
175    /// Canonical source prefix this token may write.
176    pub source_prefix: String,
177}
178
179#[derive(Debug, Deserialize)]
180struct TokensConfig {
181    #[serde(default)]
182    token: Vec<TokenEntry>,
183    rate_limit: Option<RateLimitConfig>,
184    /// Optional per-tenant schema declarations.  Existing tokens.toml files
185    /// without a `[[schema]]` table will deserialise this as an empty `Vec`,
186    /// producing a [`SchemaRegistry::empty()`] registry.
187    #[serde(default)]
188    schema: Vec<SchemaEntry>,
189}
190
191#[derive(Debug, Deserialize)]
192struct TokenEntry {
193    id: String,
194    secret: String,
195    project: String,
196    source_prefix: String,
197}
198
199/// One `[[schema]]` entry from a tokens config file.
200#[derive(Debug, Deserialize)]
201struct SchemaEntry {
202    /// Project this schema applies to; must match a `[[token]].project`.
203    project: String,
204    /// Payload kind (`crash_metadata` or `log_attributes`).
205    kind: SchemaKind,
206    /// Path to the JSON Schema file, resolved relative to the tokens config.
207    path: std::path::PathBuf,
208}
209
210/// Errors returned while loading security configuration.
211#[derive(Debug, thiserror::Error)]
212pub enum AuthConfigError {
213    /// Token configuration file could not be read.
214    #[error("token config I/O error: {0}")]
215    Io(#[from] std::io::Error),
216    /// Token configuration file is not valid TOML.
217    #[error("token config TOML error: {0}")]
218    Toml(#[from] toml::de::Error),
219    /// Token configuration did not contain any token entries.
220    #[error("token config contains no tokens")]
221    NoTokens,
222    /// Token entry contained an invalid Argon2 hash.
223    #[error("token `{id}` has an invalid Argon2 hash: {message}")]
224    InvalidHash {
225        /// Token identifier from the configuration file.
226        id: String,
227        /// Hash parser error message.
228        message: String,
229    },
230    /// A `[[schema]]` entry references a project not declared in `[[token]]`.
231    #[error(
232        "schema entry for project `{project}` / kind `{kind:?}` does not match any token project"
233    )]
234    SchemaProjectMismatch {
235        /// The project name from the `[[schema]]` entry.
236        project: String,
237        /// The schema kind from the entry.
238        kind: SchemaKind,
239    },
240    /// A schema file could not be read or parsed.
241    #[error("schema error: {0}")]
242    Schema(#[from] SchemaError),
243}
244
245impl TokenStore {
246    fn from_entries(entries: Vec<TokenEntry>) -> Result<Self, AuthConfigError> {
247        let mut tokens = Vec::with_capacity(entries.len());
248        for token in entries {
249            PasswordHash::new(&token.secret).map_err(|error| AuthConfigError::InvalidHash {
250                id: token.id.clone(),
251                message: error.to_string(),
252            })?;
253            tokens.push(TokenContext {
254                id: token.id,
255                secret_hash: token.secret,
256                project: token.project,
257                source_prefix: token.source_prefix,
258            });
259        }
260        if tokens.is_empty() {
261            return Err(AuthConfigError::NoTokens);
262        }
263        Ok(Self {
264            tokens: Arc::new(tokens),
265        })
266    }
267}
268
269#[derive(Clone)]
270pub(crate) struct AuthState {
271    pub token_store: TokenStore,
272    pub metrics: Metrics,
273}
274
275pub(crate) async fn auth_middleware(
276    State(state): State<AuthState>,
277    mut request: Request<Body>,
278    next: Next,
279) -> Response {
280    let path = request.uri().path();
281    if path == "/healthz" || path == "/metrics" {
282        return next.run(request).await;
283    }
284
285    let endpoint = endpoint_label(path);
286    let started = std::time::Instant::now();
287    let Some(header_value) = request.headers().get(header::AUTHORIZATION) else {
288        state
289            .metrics
290            .observe_request(endpoint, "401", started.elapsed());
291        return auth_error(StatusCode::UNAUTHORIZED, "missing bearer token");
292    };
293    let Ok(header_value) = header_value.to_str() else {
294        state
295            .metrics
296            .observe_request(endpoint, "401", started.elapsed());
297        return auth_error(
298            StatusCode::UNAUTHORIZED,
299            "authorization header is not UTF-8",
300        );
301    };
302    let Some(token) = header_value.strip_prefix("Bearer ") else {
303        state
304            .metrics
305            .observe_request(endpoint, "401", started.elapsed());
306        return auth_error(
307            StatusCode::UNAUTHORIZED,
308            "authorization header must use Bearer",
309        );
310    };
311    let Some(context) = state.token_store.authenticate(token) else {
312        state
313            .metrics
314            .observe_request(endpoint, "401", started.elapsed());
315        return auth_error(StatusCode::UNAUTHORIZED, "invalid bearer token");
316    };
317
318    request.extensions_mut().insert(context);
319    next.run(request).await
320}
321
322#[allow(clippy::result_large_err)]
323pub(crate) fn token_from_extensions(
324    extensions: &axum::http::Extensions,
325) -> Result<TokenContext, tonic::Status> {
326    extensions
327        .get::<TokenContext>()
328        .cloned()
329        .ok_or_else(|| tonic::Status::unauthenticated("missing token context"))
330}
331
332pub(crate) fn endpoint_label(path: &str) -> &'static str {
333    if path == "/v1/crashes" {
334        "crashes"
335    } else if path == "/opentelemetry.proto.collector.logs.v1.LogsService/Export" {
336        "logs"
337    } else if path == "/metrics" {
338        "metrics"
339    } else if path == "/healthz" {
340        "healthz"
341    } else {
342        "other"
343    }
344}
345
346fn auth_error(status: StatusCode, message: &str) -> Response {
347    (
348        status,
349        Json(json!({
350            "error": {
351                "code": status.as_u16(),
352                "message": message,
353            }
354        })),
355    )
356        .into_response()
357}