brk_server 0.3.0-beta.2

A server with an API for anything from BRK
Documentation
use std::net::SocketAddr;

use axum::{
    Extension,
    body::{Body, Bytes},
    extract::{Query, State},
    http::{HeaderMap, Uri},
    response::Response,
};
use brk_error::Result as BrkResult;
use brk_query::{Query as BrkQuery, ResolvedQuery};
use brk_types::{Format, Output, SeriesOutput, SeriesSelection};

use crate::{
    Result,
    api::series::{CACHE_CONTROL, max_weight},
    extended::{ContentEncoding, HeaderMapExtended, ResponseExtended},
};

use super::AppState;

pub async fn handler(
    uri: Uri,
    headers: HeaderMap,
    addr: Extension<SocketAddr>,
    Query(params): Query<SeriesSelection>,
    state: State<AppState>,
) -> Result<Response> {
    format_and_respond(uri, headers, addr, params, state, |q, r| q.format(r)).await
}

pub async fn raw_handler(
    uri: Uri,
    headers: HeaderMap,
    addr: Extension<SocketAddr>,
    Query(params): Query<SeriesSelection>,
    state: State<AppState>,
) -> Result<Response> {
    format_and_respond(uri, headers, addr, params, state, |q, r| q.format_raw(r)).await
}

async fn format_and_respond(
    uri: Uri,
    headers: HeaderMap,
    Extension(addr): Extension<SocketAddr>,
    params: SeriesSelection,
    state: State<AppState>,
    formatter: fn(&BrkQuery, ResolvedQuery) -> BrkResult<SeriesOutput>,
) -> Result<Response> {
    // Phase 1: Search and resolve metadata (cheap)
    let resolved = state
        .run(move |q| q.resolve(params, max_weight(&addr)))
        .await?;

    let format = resolved.format();
    let etag = resolved.etag();
    let csv_filename = resolved.csv_filename();

    if headers.has_etag(etag.as_str()) {
        return Ok(Response::new_not_modified(&etag, CACHE_CONTROL));
    }

    // Phase 2: Format (expensive, server-side cached)
    let encoding = ContentEncoding::negotiate(&headers);
    let cache_key = format!(
        "single-{}{}{}-{}",
        uri.path(),
        uri.query().unwrap_or(""),
        etag,
        encoding.as_str()
    );
    let query = &state;
    let bytes = state
        .get_or_insert(&cache_key, async move {
            query
                .run(move |q| {
                    let out = formatter(q, resolved)?;
                    let raw = match out.output {
                        Output::CSV(s) => Bytes::from(s),
                        Output::Json(v) => Bytes::from(v),
                    };
                    Ok(encoding.compress(raw))
                })
                .await
        })
        .await?;

    let mut response = Response::new(Body::from(bytes));
    let h = response.headers_mut();
    h.insert_etag(etag.as_str());
    h.insert_cache_control(CACHE_CONTROL);
    h.insert_content_encoding(encoding);
    match format {
        Format::CSV => {
            h.insert_content_disposition_attachment(&csv_filename);
            h.insert_content_type_text_csv();
        }
        Format::JSON => h.insert_content_type_application_json(),
    }

    Ok(response)
}