Skip to main content

mii_http/
server.rs

1//! HTTP server runtime built on top of axum.
2
3use crate::exec::{self, BodyValue, ExecContext, ExecOutput, FormFieldValue};
4use crate::spec::*;
5use crate::value::{self, ValidationError};
6use axum::{
7    Router,
8    body::{Body, Bytes},
9    extract::{DefaultBodyLimit, Path as AxPath, Query, State},
10    http::{HeaderMap, StatusCode, header},
11    response::{IntoResponse, Response},
12    routing::{MethodFilter, MethodRouter},
13};
14use futures_util::stream::{Stream, StreamExt};
15use std::collections::{BTreeMap, HashMap};
16use std::net::SocketAddr;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::net::TcpListener;
21
22#[derive(Clone)]
23struct AppState {
24    spec: Arc<Spec>,
25    auth_secret: Option<Vec<u8>>,
26    auth_jwt_verifier: Option<String>,
27    dry_run: bool,
28}
29
30pub async fn serve(spec: Spec, addr: SocketAddr, dry_run: bool) -> std::io::Result<()> {
31    tracing::debug!(addr = %addr, dry_run, endpoints = spec.endpoints.len(), "server::serve");
32    let auth_secret = match &spec.setup.token_secret {
33        Some(src) => Some(resolve_static_source(src)?.into_bytes()),
34        None => None,
35    };
36    let auth_jwt_verifier = match &spec.setup.jwt_verifier {
37        Some(src) => Some(resolve_static_source(src)?),
38        None => None,
39    };
40    let state = AppState {
41        spec: Arc::new(spec),
42        auth_secret,
43        auth_jwt_verifier,
44        dry_run,
45    };
46    let router = build_router(state.clone());
47    let listener = TcpListener::bind(addr).await?;
48    if dry_run {
49        tracing::info!(
50            "mii-http listening on {} (dry-run: commands will not be executed)",
51            addr
52        );
53    } else {
54        tracing::info!("mii-http listening on {}", addr);
55    }
56    axum::serve(listener, router.into_make_service())
57        .await
58        .map_err(|e| std::io::Error::other(e.to_string()))
59}
60
61fn resolve_static_source(src: &ValueSource) -> std::io::Result<String> {
62    match src {
63        ValueSource::Env { name, .. } => std::env::var(name)
64            .map_err(|_| std::io::Error::other(format!("env var `{}` not set", name))),
65        ValueSource::Literal { value, .. } => Ok(value.clone()),
66        ValueSource::Header { .. } => Err(std::io::Error::other(
67            "[HEADER ...] is not valid for static setup values",
68        )),
69    }
70}
71
72fn build_router(state: AppState) -> Router {
73    tracing::debug!("server::build_router");
74    let mut routes: HashMap<String, MethodRouter<AppState>> = HashMap::new();
75    let prefix = compute_prefix(&state.spec.setup);
76    let body_limit = state.spec.setup.max_body_size.map(saturating_usize);
77
78    for (idx, ep) in state.spec.endpoints.iter().enumerate() {
79        let path = format!("{}{}", prefix, axum_path(&ep.path_segments));
80        tracing::debug!(method = ep.method.as_str(), path = %path, "server::build_router: mounting route");
81        let entry = routes.entry(path).or_default();
82        let idx_clone = idx;
83        let mr = MethodRouter::<AppState>::new().on(
84            method_filter(ep.method),
85            move |s: State<AppState>,
86                  p: AxPath<HashMap<String, String>>,
87                  q: Query<HashMap<String, String>>,
88                  h: HeaderMap,
89                  b: Bytes| handle(s, p, q, h, b, idx_clone),
90        );
91        let merged = std::mem::take(entry).merge(mr);
92        *entry = merged;
93    }
94
95    let mut router = Router::new();
96    for (path, mr) in routes {
97        router = router.route(&path, mr);
98    }
99    let router = router.with_state(state);
100    if let Some(limit) = body_limit {
101        router.layer(DefaultBodyLimit::max(limit))
102    } else {
103        router
104    }
105}
106
107fn saturating_usize(n: u64) -> usize {
108    usize::try_from(n).unwrap_or(usize::MAX)
109}
110
111fn method_filter(m: Method) -> MethodFilter {
112    match m {
113        Method::Get => MethodFilter::GET,
114        Method::Post => MethodFilter::POST,
115        Method::Put => MethodFilter::PUT,
116        Method::Delete => MethodFilter::DELETE,
117        Method::Patch => MethodFilter::PATCH,
118    }
119}
120
121fn compute_prefix(setup: &Setup) -> String {
122    let base = setup.base.clone().unwrap_or_default();
123    let version = setup
124        .version
125        .map(|v| format!("/v{}", v))
126        .unwrap_or_default();
127    format!("{}{}", base, version)
128}
129
130fn axum_path(segs: &[PathSegment]) -> String {
131    let mut out = String::new();
132    for seg in segs {
133        out.push('/');
134        match seg {
135            PathSegment::Literal(s) => out.push_str(s),
136            PathSegment::Param { name, .. } => {
137                out.push(':');
138                out.push_str(name);
139            }
140        }
141    }
142    if out.is_empty() { "/".into() } else { out }
143}
144
145async fn handle(
146    State(state): State<AppState>,
147    AxPath(path): AxPath<HashMap<String, String>>,
148    Query(query): Query<HashMap<String, String>>,
149    headers: HeaderMap,
150    body: Bytes,
151    endpoint_idx: usize,
152) -> Response {
153    let ep = match state.spec.endpoints.get(endpoint_idx) {
154        Some(e) => e,
155        None => return error_response(StatusCode::INTERNAL_SERVER_ERROR, "endpoint missing"),
156    };
157    tracing::info!(method = ep.method.as_str(), path = %ep.path, "server::handle: incoming request");
158    match handle_inner(&state, ep, path, query, headers, body).await {
159        Ok(r) => r,
160        Err(err) => {
161            tracing::warn!(method = ep.method.as_str(), path = %ep.path, status = %err.status, error = %err.message, "server::handle: returning error");
162            err.into_response()
163        }
164    }
165}
166
167async fn handle_inner(
168    state: &AppState,
169    ep: &Endpoint,
170    path: HashMap<String, String>,
171    query: HashMap<String, String>,
172    headers: HeaderMap,
173    body: Bytes,
174) -> Result<Response, HandlerError> {
175    let setup = &state.spec.setup;
176
177    enforce_body_size(setup, &body)?;
178    authenticate(state, &headers)?;
179
180    let ctx = ExecContext {
181        query: validate_query(setup, ep, &query)?,
182        headers: validate_headers(setup, ep, &headers)?,
183        path: validate_path(ep, &path)?,
184        vars: resolve_vars(setup, ep, &headers)?,
185        body: build_body(ep, &headers, body)?,
186    };
187
188    let timeout = setup.timeout_ms.map(Duration::from_millis);
189
190    if state.dry_run {
191        let preview = exec::preview_pipeline(&ep.exec.statements, &ctx);
192        tracing::info!(
193            method = ep.method.as_str(),
194            path = %ep.path,
195            stages = ?preview,
196            "dry-run: skipping execution",
197        );
198        let mut body_text = String::from("[dry-run] would execute:\n");
199        for stage in &preview {
200            body_text.push_str("  ");
201            body_text.push_str(stage);
202            body_text.push('\n');
203        }
204        let mut resp = Response::new(body_text.into());
205        resp.headers_mut().insert(
206            header::CONTENT_TYPE,
207            header::HeaderValue::from_static("text/plain; charset=utf-8"),
208        );
209        return Ok(resp);
210    }
211
212    let content_type = ep
213        .response_type
214        .clone()
215        .filter(|s| !s.is_empty())
216        .unwrap_or_else(|| "text/plain; charset=utf-8".into());
217
218    if ep.response_stream {
219        return run_streaming(ep, ctx, timeout, content_type).await;
220    }
221
222    let ExecOutput {
223        status,
224        stdout,
225        stderr,
226    } = exec::run_pipeline(&ep.exec.statements, &ctx, timeout)
227        .await
228        .map_err(|e| HandlerError::new(StatusCode::INTERNAL_SERVER_ERROR, e))?;
229
230    if status != 0 {
231        tracing::warn!(
232            method = ep.method.as_str(),
233            path = %ep.path,
234            status,
235            stderr = %String::from_utf8_lossy(&stderr),
236            "exec returned non-zero"
237        );
238        return Err(HandlerError::new(
239            StatusCode::INTERNAL_SERVER_ERROR,
240            format!("command exited with status {}", status),
241        ));
242    }
243
244    let mut resp = Response::new(stdout.into());
245    resp.headers_mut().insert(
246        header::CONTENT_TYPE,
247        content_type
248            .parse()
249            .unwrap_or_else(|_| header::HeaderValue::from_static("text/plain; charset=utf-8")),
250    );
251    Ok(resp)
252}
253
254async fn run_streaming(
255    ep: &Endpoint,
256    ctx: ExecContext,
257    timeout: Option<Duration>,
258    content_type: String,
259) -> Result<Response, HandlerError> {
260    let streaming = exec::run_pipeline_streaming(&ep.exec.statements, &ctx, timeout)
261        .await
262        .map_err(|e| HandlerError::new(StatusCode::INTERNAL_SERVER_ERROR, e))?;
263    let exec::StreamingExec {
264        stdout_rx,
265        completion,
266    } = streaming;
267    let method_str = ep.method.as_str().to_string();
268    let path_str = ep.path.clone();
269    tokio::spawn(async move {
270        match completion.await {
271            Ok(Ok(c)) if c.status != 0 => tracing::warn!(
272                method = %method_str,
273                path = %path_str,
274                status = c.status,
275                stderr = %String::from_utf8_lossy(&c.stderr),
276                "streaming exec returned non-zero"
277            ),
278            Ok(Err(e)) => tracing::warn!(method = %method_str, path = %path_str, error = %e, "streaming exec failed"),
279            _ => {}
280        }
281    });
282    let stream = chunk_stream(stdout_rx);
283    let body = Body::from_stream(stream);
284    let mut resp = Response::new(body);
285    resp.headers_mut().insert(
286        header::CONTENT_TYPE,
287        content_type
288            .parse()
289            .unwrap_or_else(|_| header::HeaderValue::from_static("text/plain; charset=utf-8")),
290    );
291    Ok(resp)
292}
293
294fn chunk_stream(
295    rx: tokio::sync::mpsc::Receiver<Result<Bytes, String>>,
296) -> Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>> {
297    let s = tokio_stream::wrappers::ReceiverStream::new(rx)
298        .map(|res| res.map_err(std::io::Error::other));
299    Box::pin(s)
300}
301
302fn check_validation(r: Result<(), ValidationError>, scope: &str) -> Result<(), HandlerError> {
303    r.map_err(|e| HandlerError::new(StatusCode::BAD_REQUEST, format!("{}: {}", scope, e.message)))
304}
305
306fn enforce_body_size(setup: &Setup, body: &Bytes) -> Result<(), HandlerError> {
307    if let Some(max) = setup.max_body_size
308        && body.len() as u64 > max
309    {
310        return Err(HandlerError::new(
311            StatusCode::PAYLOAD_TOO_LARGE,
312            format!("body exceeds max size of {} bytes", max),
313        ));
314    }
315    Ok(())
316}
317
318fn authenticate(state: &AppState, headers: &HeaderMap) -> Result<(), HandlerError> {
319    tracing::debug!("server::authenticate");
320    if let Some(AuthSpec::BearerHeader { header: hname, .. }) = &state.spec.setup.auth {
321        let token = extract_bearer(headers, hname, state.spec.setup.max_header_size)?;
322        verify_token(state, &token)?;
323    }
324    Ok(())
325}
326
327fn enforce_size(
328    actual: usize,
329    max: Option<u64>,
330    status: StatusCode,
331    label: impl FnOnce() -> String,
332) -> Result<(), HandlerError> {
333    if let Some(max) = max
334        && actual as u64 > max
335    {
336        return Err(HandlerError::new(status, label()));
337    }
338    Ok(())
339}
340
341fn require_or_optional<T>(
342    found: Option<T>,
343    optional: bool,
344    missing_msg: impl FnOnce() -> String,
345) -> Result<Option<T>, HandlerError> {
346    match found {
347        Some(v) => Ok(Some(v)),
348        None if optional => Ok(None),
349        None => Err(HandlerError::new(StatusCode::BAD_REQUEST, missing_msg())),
350    }
351}
352
353fn validate_query(
354    setup: &Setup,
355    ep: &Endpoint,
356    query: &HashMap<String, String>,
357) -> Result<BTreeMap<String, String>, HandlerError> {
358    tracing::debug!(endpoint = %ep.path, fields = ep.query_params.len(), "server::validate_query");
359    let mut out = BTreeMap::new();
360    for f in &ep.query_params {
361        let v = require_or_optional(query.get(&f.name).cloned(), f.optional, || {
362            format!("missing query parameter `{}`", f.name)
363        })?;
364        if let Some(v) = v {
365            enforce_size(
366                v.len(),
367                setup.max_query_param_size,
368                StatusCode::URI_TOO_LONG,
369                || format!("query param `{}` exceeds max size", f.name),
370            )?;
371            check_validation(
372                value::validate_text(&v, &f.ty),
373                &format!("query `{}`", f.name),
374            )?;
375            out.insert(f.name.clone(), v);
376        }
377    }
378    Ok(out)
379}
380
381fn validate_headers(
382    setup: &Setup,
383    ep: &Endpoint,
384    headers: &HeaderMap,
385) -> Result<BTreeMap<String, String>, HandlerError> {
386    tracing::debug!(endpoint = %ep.path, fields = ep.headers.len(), "server::validate_headers");
387    let mut out = BTreeMap::new();
388    for f in &ep.headers {
389        let v = require_or_optional(header_get(headers, &f.name), f.optional, || {
390            format!("missing header `{}`", f.name)
391        })?;
392        if let Some(v) = v {
393            enforce_size(
394                v.len(),
395                setup.max_header_size,
396                StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE,
397                || format!("header `{}` exceeds max size", f.name),
398            )?;
399            check_validation(
400                value::validate_text(&v, &f.ty),
401                &format!("header `{}`", f.name),
402            )?;
403            out.insert(f.name.clone(), v);
404        }
405    }
406    Ok(out)
407}
408
409fn validate_path(
410    ep: &Endpoint,
411    path: &HashMap<String, String>,
412) -> Result<BTreeMap<String, String>, HandlerError> {
413    tracing::debug!(endpoint = %ep.path, "server::validate_path");
414    let mut out = BTreeMap::new();
415    for seg in &ep.path_segments {
416        if let PathSegment::Param { name, ty, .. } = seg {
417            let v = path.get(name).cloned().ok_or_else(|| {
418                HandlerError::new(
419                    StatusCode::BAD_REQUEST,
420                    format!("missing path param `{}`", name),
421                )
422            })?;
423            check_validation(value::validate_text(&v, ty), &format!("path `{}`", name))?;
424            out.insert(name.clone(), v);
425        }
426    }
427    Ok(out)
428}
429
430fn resolve_vars(
431    setup: &Setup,
432    ep: &Endpoint,
433    headers: &HeaderMap,
434) -> Result<BTreeMap<String, String>, HandlerError> {
435    tracing::debug!(endpoint = %ep.path, vars = ep.vars.len(), "server::resolve_vars");
436    let mut out = BTreeMap::new();
437    for v in &ep.vars {
438        let resolved = resolve_runtime_source(setup, &v.source, headers)?;
439        out.insert(v.name.clone(), resolved);
440    }
441    Ok(out)
442}
443
444fn build_body(ep: &Endpoint, headers: &HeaderMap, body: Bytes) -> Result<BodyValue, HandlerError> {
445    tracing::debug!(endpoint = %ep.path, body_len = body.len(), "server::build_body");
446    Ok(match &ep.body {
447        None => BodyValue::None,
448        Some(BodySpec::String { .. }) => {
449            BodyValue::Text(String::from_utf8(body.to_vec()).map_err(|_| {
450                HandlerError::new(StatusCode::BAD_REQUEST, "body is not valid UTF-8")
451            })?)
452        }
453        Some(BodySpec::Binary { .. }) => BodyValue::Binary(body),
454        Some(BodySpec::Json { schema, .. }) => {
455            let v: serde_json::Value = serde_json::from_slice(&body).map_err(|e| {
456                HandlerError::new(StatusCode::BAD_REQUEST, format!("invalid JSON body: {}", e))
457            })?;
458            if let Some(schema) = schema {
459                check_validation(value::validate_json(&v, schema), "json body")?;
460            }
461            BodyValue::Json(v)
462        }
463        Some(BodySpec::Form { fields, .. }) => {
464            let parsed = parse_form_body(headers, &body, fields)?;
465            for f in fields {
466                let present = parsed.get(&f.name);
467                if present.is_none() && !f.optional {
468                    return Err(HandlerError::new(
469                        StatusCode::BAD_REQUEST,
470                        format!("missing form field `{}`", f.name),
471                    ));
472                }
473                if let Some(FormFieldValue::Text(v)) = present {
474                    check_validation(
475                        value::validate_text(v, &f.ty),
476                        &format!("form field `{}`", f.name),
477                    )?;
478                }
479            }
480            BodyValue::Form(parsed)
481        }
482    })
483}
484
485fn parse_form_body(
486    headers: &HeaderMap,
487    body: &Bytes,
488    fields: &[NamedField],
489) -> Result<BTreeMap<String, FormFieldValue>, HandlerError> {
490    let ct = headers
491        .get(header::CONTENT_TYPE)
492        .and_then(|v| v.to_str().ok())
493        .unwrap_or("");
494    if let Some(boundary) = multipart_boundary(ct) {
495        return parse_multipart(body, &boundary, fields);
496    }
497    let parsed: BTreeMap<String, FormFieldValue> = form_urlencoded::parse(body)
498        .into_owned()
499        .map(|(k, v)| (k, FormFieldValue::Text(v)))
500        .collect();
501    // url-encoded bodies cannot carry binary safely.
502    for f in fields {
503        if matches!(f.ty, TypeExpr::Binary) && parsed.contains_key(&f.name) {
504            return Err(HandlerError::new(
505                StatusCode::UNSUPPORTED_MEDIA_TYPE,
506                format!(
507                    "form field `{}` is binary; use multipart/form-data",
508                    f.name
509                ),
510            ));
511        }
512    }
513    Ok(parsed)
514}
515
516fn multipart_boundary(content_type: &str) -> Option<String> {
517    let lower = content_type.to_ascii_lowercase();
518    if !lower.starts_with("multipart/form-data") {
519        return None;
520    }
521    for part in content_type.split(';').skip(1) {
522        let part = part.trim();
523        if let Some(rest) = part.strip_prefix("boundary=") {
524            let b = rest.trim().trim_matches('"');
525            if !b.is_empty() {
526                return Some(b.to_string());
527            }
528        }
529    }
530    None
531}
532
533fn parse_multipart(
534    body: &Bytes,
535    boundary: &str,
536    fields: &[NamedField],
537) -> Result<BTreeMap<String, FormFieldValue>, HandlerError> {
538    let binary_fields: std::collections::HashSet<&str> = fields
539        .iter()
540        .filter(|f| matches!(f.ty, TypeExpr::Binary))
541        .map(|f| f.name.as_str())
542        .collect();
543    let mut out: BTreeMap<String, FormFieldValue> = BTreeMap::new();
544    for part in split_multipart(body, boundary) {
545        let MultipartPart { name, data } = match part {
546            Ok(p) => p,
547            Err(e) => {
548                return Err(HandlerError::new(
549                    StatusCode::BAD_REQUEST,
550                    format!("invalid multipart body: {}", e),
551                ));
552            }
553        };
554        let Some(name) = name else { continue };
555        if binary_fields.contains(name.as_str()) {
556            out.insert(name, FormFieldValue::Binary(Bytes::copy_from_slice(&data)));
557        } else {
558            let text = String::from_utf8(data).map_err(|_| {
559                HandlerError::new(
560                    StatusCode::BAD_REQUEST,
561                    format!("form field `{}` is not valid UTF-8", name),
562                )
563            })?;
564            out.insert(name, FormFieldValue::Text(text));
565        }
566    }
567    Ok(out)
568}
569
570struct MultipartPart {
571    name: Option<String>,
572    data: Vec<u8>,
573}
574
575/// Minimal multipart/form-data splitter sufficient for typical browser-style
576/// uploads. Returns one entry per part, preserving raw bytes for binary
577/// fields. Errors are surfaced as a string for diagnostics.
578fn split_multipart(body: &[u8], boundary: &str) -> Vec<Result<MultipartPart, String>> {
579    let delim = format!("--{}", boundary);
580    let body_str_lossy = String::from_utf8_lossy(body); // only used to find indices
581    let _ = body_str_lossy; // not needed; do byte search instead
582    let mut parts = Vec::new();
583    let bytes = body;
584    let delim_bytes = delim.as_bytes();
585    // find all delimiter positions
586    let mut positions = Vec::new();
587    let mut i = 0;
588    while i + delim_bytes.len() <= bytes.len() {
589        if &bytes[i..i + delim_bytes.len()] == delim_bytes {
590            positions.push(i);
591            i += delim_bytes.len();
592        } else {
593            i += 1;
594        }
595    }
596    if positions.is_empty() {
597        parts.push(Err("missing multipart boundary".to_string()));
598        return parts;
599    }
600    for win in positions.windows(2) {
601        let start = win[0] + delim_bytes.len();
602        let end = win[1];
603        let segment = &bytes[start..end];
604        // segment is `\r\n<headers>\r\n\r\n<data>\r\n` (or terminator)
605        let segment = segment.strip_prefix(b"\r\n").unwrap_or(segment);
606        let segment = segment.strip_suffix(b"\r\n").unwrap_or(segment);
607        // split headers / body
608        let Some(sep) = find_subseq(segment, b"\r\n\r\n") else {
609            parts.push(Err("malformed multipart segment".to_string()));
610            continue;
611        };
612        let header_bytes = &segment[..sep];
613        let data = segment[sep + 4..].to_vec();
614        let headers = String::from_utf8_lossy(header_bytes);
615        let mut name = None;
616        for line in headers.split("\r\n") {
617            if let Some(rest) = line
618                .to_ascii_lowercase()
619                .strip_prefix("content-disposition:")
620            {
621                let rest_orig = &line["content-disposition:".len()..];
622                let _ = rest;
623                for attr in rest_orig.split(';') {
624                    let attr = attr.trim();
625                    if let Some(v) = attr.strip_prefix("name=") {
626                        name = Some(v.trim().trim_matches('"').to_string());
627                    }
628                }
629            }
630        }
631        parts.push(Ok(MultipartPart { name, data }));
632    }
633    parts
634}
635
636fn find_subseq(haystack: &[u8], needle: &[u8]) -> Option<usize> {
637    haystack
638        .windows(needle.len())
639        .position(|w| w == needle)
640}
641
642fn header_get(headers: &HeaderMap, name: &str) -> Option<String> {
643    headers
644        .get(name)
645        .and_then(|v| v.to_str().ok())
646        .map(|s| s.to_string())
647}
648
649fn extract_bearer(
650    headers: &HeaderMap,
651    header_name: &str,
652    max_header_size: Option<u64>,
653) -> Result<String, HandlerError> {
654    let raw = header_get(headers, header_name).ok_or_else(|| {
655        HandlerError::new(
656            StatusCode::UNAUTHORIZED,
657            format!("missing `{}`", header_name),
658        )
659    })?;
660    enforce_size(
661        raw.len(),
662        max_header_size,
663        StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE,
664        || format!("auth header `{}` exceeds max size", header_name),
665    )?;
666    let token = raw
667        .strip_prefix("Bearer ")
668        .or_else(|| raw.strip_prefix("bearer "))
669        .unwrap_or(&raw)
670        .trim()
671        .to_string();
672    if token.is_empty() {
673        return Err(HandlerError::new(
674            StatusCode::UNAUTHORIZED,
675            "empty bearer token",
676        ));
677    }
678    Ok(token)
679}
680
681fn verify_token(state: &AppState, token: &str) -> Result<(), HandlerError> {
682    if let Some(verifier) = &state.auth_jwt_verifier {
683        use jsonwebtoken::{DecodingKey, Validation, decode};
684        let key = DecodingKey::from_secret(verifier.as_bytes());
685        let mut validation = Validation::default();
686        validation.validate_exp = true;
687        decode::<serde_json::Value>(token, &key, &validation).map_err(|e| {
688            HandlerError::new(StatusCode::UNAUTHORIZED, format!("invalid token: {}", e))
689        })?;
690        return Ok(());
691    }
692    if let Some(secret) = &state.auth_secret {
693        if constant_time_eq(token.as_bytes(), secret) {
694            return Ok(());
695        }
696        return Err(HandlerError::new(StatusCode::UNAUTHORIZED, "invalid token"));
697    }
698    Ok(())
699}
700
701fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
702    if a.len() != b.len() {
703        return false;
704    }
705    let mut diff: u8 = 0;
706    for (x, y) in a.iter().zip(b.iter()) {
707        diff |= x ^ y;
708    }
709    diff == 0
710}
711
712fn resolve_runtime_source(
713    setup: &Setup,
714    src: &ValueSource,
715    headers: &HeaderMap,
716) -> Result<String, HandlerError> {
717    match src {
718        ValueSource::Env { name, .. } => std::env::var(name).map_err(|_| {
719            HandlerError::new(
720                StatusCode::INTERNAL_SERVER_ERROR,
721                format!("env var `{}` not set", name),
722            )
723        }),
724        ValueSource::Header { name, .. } => {
725            let value = header_get(headers, name).ok_or_else(|| {
726                HandlerError::new(
727                    StatusCode::BAD_REQUEST,
728                    format!("missing VAR header `{}`", name),
729                )
730            })?;
731            enforce_size(
732                value.len(),
733                setup.max_header_size,
734                StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE,
735                || format!("VAR header `{}` exceeds max size", name),
736            )?;
737            Ok(value)
738        }
739        ValueSource::Literal { value, .. } => Ok(value.clone()),
740    }
741}
742
743#[derive(Debug)]
744struct HandlerError {
745    status: StatusCode,
746    message: String,
747}
748
749impl HandlerError {
750    fn new(status: StatusCode, msg: impl Into<String>) -> Self {
751        Self {
752            status,
753            message: msg.into(),
754        }
755    }
756}
757
758impl IntoResponse for HandlerError {
759    fn into_response(self) -> Response {
760        error_response(self.status, &self.message)
761    }
762}
763
764fn error_response(status: StatusCode, msg: &str) -> Response {
765    let mut resp = Response::new(format!("{}\n", msg).into());
766    *resp.status_mut() = status;
767    resp.headers_mut().insert(
768        header::CONTENT_TYPE,
769        header::HeaderValue::from_static("text/plain; charset=utf-8"),
770    );
771    resp
772}