codex-helper-core 0.15.0

Core library for codex-helper.
Documentation
use std::sync::OnceLock;
use std::time::Instant;

use axum::body::{Body, Bytes, to_bytes};
use axum::http::{HeaderMap, Method, Request, StatusCode, Uri};

use crate::lb::CooldownBackoff;
use crate::logging::{BodyPreview, HeaderEntry, ServiceTierLog};
use crate::routing_ir::RouteRequestContext;
use crate::state::SessionBinding;

use super::ProxyService;
use super::client_identity::{extract_client_addr, extract_client_name, extract_session_id};
use super::headers::header_map_to_entries;
use super::request_failures::{log_client_body_read_error, log_no_routable_station};
use super::request_preparation::{
    RequestFlavor, build_body_previews, detect_request_flavor, prepare_request_body,
};
use super::request_routing::RequestRouteSelection;
use super::retry::{RetryPlan, retry_plan};

pub(super) struct PreparedProxyRequest {
    pub(super) method: Method,
    pub(super) uri: Uri,
    pub(super) client_uri: String,
    pub(super) client_headers: HeaderMap,
    pub(super) client_headers_entries_cache: OnceLock<Vec<HeaderEntry>>,
    pub(super) session_id: Option<String>,
    pub(super) session_binding: Option<SessionBinding>,
    pub(super) route_selection: RequestRouteSelection,
    pub(super) cwd: Option<String>,
    pub(super) session_override_config: Option<String>,
    pub(super) global_station_override: Option<String>,
    pub(super) override_effort: Option<String>,
    pub(super) override_model: Option<String>,
    pub(super) override_service_tier: Option<String>,
    pub(super) body_for_upstream: Bytes,
    pub(super) request_model: Option<String>,
    pub(super) effective_effort: Option<String>,
    pub(super) effective_service_tier: Option<String>,
    pub(super) base_service_tier: ServiceTierLog,
    pub(super) request_body_len: usize,
    pub(super) request_flavor: RequestFlavor,
    pub(super) request_body_previews: bool,
    pub(super) debug_max: usize,
    pub(super) warn_max: usize,
    pub(super) client_body_debug: Option<BodyPreview>,
    pub(super) client_body_warn: Option<BodyPreview>,
    pub(super) request_id: u64,
    pub(super) plan: RetryPlan,
    pub(super) cooldown_backoff: CooldownBackoff,
}

pub(super) async fn prepare_proxy_request(
    proxy: &ProxyService,
    req: Request<Body>,
    start: &Instant,
    started_at_ms: u64,
) -> Result<PreparedProxyRequest, (StatusCode, String)> {
    let (parts, body) = req.into_parts();
    let client_addr = extract_client_addr(&parts.extensions);
    let uri = parts.uri;
    let client_uri = uri.to_string();
    let method = parts.method;
    let client_headers = parts.headers;
    let client_headers_entries_cache: OnceLock<Vec<HeaderEntry>> = OnceLock::new();

    let session_id = extract_session_id(&client_headers);
    let client_name = extract_client_name(&client_headers);

    let config_reloaded = proxy.config.maybe_reload_from_disk().await;
    let cfg_snapshot = proxy.config.snapshot().await;
    let v4_snapshot = proxy.config.v4_snapshot().await;
    let route_graph_config = v4_snapshot.is_some();
    let mgr = proxy.service_manager(cfg_snapshot.as_ref());
    if config_reloaded {
        proxy
            .state
            .prune_runtime_observability_for_service(proxy.service_name, mgr)
            .await;
    }
    let session_binding = if let Some(id) = session_id.as_deref() {
        proxy
            .ensure_default_session_binding(mgr, id, started_at_ms)
            .await
    } else {
        None
    };
    let request_flavor =
        detect_request_flavor(proxy.service_name, &method, &client_headers, uri.path());
    let cwd = resolve_and_touch_session_state(proxy, session_id.as_deref(), started_at_ms).await;
    let session_override_config = if !route_graph_config && let Some(id) = session_id.as_deref() {
        proxy.state.get_session_station_override(id).await
    } else {
        None
    };
    let global_station_override = if route_graph_config {
        None
    } else {
        proxy.state.get_global_station_override().await
    };

    let raw_body = match to_bytes(body, 10 * 1024 * 1024).await {
        Ok(body) => body,
        Err(error) => {
            let dur = start.elapsed().as_millis() as u64;
            let client_headers_entries = client_headers_entries_cache
                .get_or_init(|| header_map_to_entries(&client_headers))
                .clone();
            return Err(log_client_body_read_error(
                super::request_failures::ClientBodyReadErrorParams {
                    proxy,
                    method: &method,
                    path: uri.path(),
                    client_uri: client_uri.as_str(),
                    session_id: session_id.clone(),
                    cwd: cwd.clone(),
                    client_headers: client_headers_entries,
                    duration_ms: dur,
                    error_message: error.to_string(),
                },
            ));
        }
    };

    let override_effort = if let Some(id) = session_id.as_deref() {
        proxy.state.get_session_effort_override(id).await
    } else {
        None
    };
    let override_model = if let Some(id) = session_id.as_deref() {
        proxy.state.get_session_model_override(id).await
    } else {
        None
    };
    let override_service_tier = if let Some(id) = session_id.as_deref() {
        proxy.state.get_session_service_tier_override(id).await
    } else {
        None
    };
    let binding_effort = session_binding
        .as_ref()
        .and_then(|binding| binding.reasoning_effort.as_deref());
    let binding_model = session_binding
        .as_ref()
        .and_then(|binding| binding.model.as_deref());
    let binding_service_tier = session_binding
        .as_ref()
        .and_then(|binding| binding.service_tier.as_deref());

    let prepared_request = prepare_request_body(
        &raw_body,
        override_effort.as_deref(),
        binding_effort,
        override_model.as_deref(),
        binding_model,
        override_service_tier.as_deref(),
        binding_service_tier,
    );
    let body_for_upstream = prepared_request.body_for_upstream.clone();
    let request_model = prepared_request.request_model.clone();
    let effective_effort = prepared_request.effective_effort.clone();
    let effective_service_tier = prepared_request.base_service_tier.effective.clone();
    let base_service_tier = prepared_request.base_service_tier.clone();
    let request_body_len = prepared_request.request_body_len;

    let debug_opt = crate::logging::http_debug_options();
    let warn_opt = crate::logging::http_warn_options();
    let debug_max = if debug_opt.enabled {
        debug_opt.max_body_bytes
    } else {
        0
    };
    let warn_max = if warn_opt.enabled {
        warn_opt.max_body_bytes
    } else {
        0
    };
    let request_body_previews = crate::logging::should_log_request_body_preview();
    let client_body_previews = build_body_previews(
        &raw_body,
        request_flavor.client_content_type.as_deref(),
        request_body_previews,
        debug_max,
        warn_max,
    );
    let client_body_debug = client_body_previews.debug.clone();
    let client_body_warn = client_body_previews.warn.clone();

    let request_id = proxy
        .state
        .begin_request(
            proxy.service_name,
            method.as_str(),
            uri.path(),
            session_id.clone(),
            client_name,
            client_addr,
            cwd.clone(),
            request_model.clone(),
            effective_effort.clone(),
            effective_service_tier.clone(),
            started_at_ms,
        )
        .await;

    let plan = retry_plan(&cfg_snapshot.retry.resolve());
    let cooldown_backoff = CooldownBackoff {
        factor: plan.cooldown_backoff_factor,
        max_secs: plan.cooldown_backoff_max_secs,
    };

    let route_request = route_request_context(
        &method,
        &uri,
        &client_headers,
        request_model.clone(),
        effective_effort.clone(),
        effective_service_tier.clone(),
    );
    let route_selection = proxy
        .lbs_for_request(
            cfg_snapshot.as_ref(),
            v4_snapshot.as_deref(),
            &route_request,
            session_id.as_deref(),
        )
        .await;
    if route_selection.is_empty() {
        let dur = start.elapsed().as_millis() as u64;
        let client_headers_entries = client_headers_entries_cache
            .get_or_init(|| header_map_to_entries(&client_headers))
            .clone();
        return Err(log_no_routable_station(
            proxy,
            &method,
            uri.path(),
            client_uri.as_str(),
            session_id.clone(),
            client_headers_entries,
            dur,
        ));
    }

    if let Some(lbs) = route_selection.legacy_lbs() {
        super::route_executor_shadow::maybe_log_route_executor_shadow_diff(
            proxy.service_name,
            request_id,
            lbs,
            request_model.as_deref(),
        );
    }

    Ok(PreparedProxyRequest {
        method,
        uri,
        client_uri,
        client_headers,
        client_headers_entries_cache,
        session_id,
        session_binding,
        route_selection,
        cwd,
        session_override_config,
        global_station_override,
        override_effort,
        override_model,
        override_service_tier,
        body_for_upstream,
        request_model,
        effective_effort,
        effective_service_tier,
        base_service_tier,
        request_body_len,
        request_flavor,
        request_body_previews,
        debug_max,
        warn_max,
        client_body_debug,
        client_body_warn,
        request_id,
        plan,
        cooldown_backoff,
    })
}

fn route_request_context(
    method: &Method,
    uri: &Uri,
    headers: &HeaderMap,
    model: Option<String>,
    reasoning_effort: Option<String>,
    service_tier: Option<String>,
) -> RouteRequestContext {
    RouteRequestContext {
        model,
        service_tier,
        reasoning_effort,
        path: Some(uri.path().to_string()),
        method: Some(method.as_str().to_string()),
        headers: headers
            .iter()
            .filter_map(|(name, value)| {
                value
                    .to_str()
                    .ok()
                    .map(|value| (name.as_str().to_string(), value.to_string()))
            })
            .collect(),
    }
}

async fn resolve_and_touch_session_state(
    proxy: &ProxyService,
    session_id: Option<&str>,
    started_at_ms: u64,
) -> Option<String> {
    let cwd = if let Some(id) = session_id {
        proxy.state.resolve_session_cwd(id).await
    } else {
        None
    };

    if let Some(id) = session_id {
        proxy.state.touch_session_override(id, started_at_ms).await;
        proxy
            .state
            .touch_session_station_override(id, started_at_ms)
            .await;
        proxy
            .state
            .touch_session_route_target_override(id, started_at_ms)
            .await;
        proxy
            .state
            .touch_session_model_override(id, started_at_ms)
            .await;
        proxy
            .state
            .touch_session_service_tier_override(id, started_at_ms)
            .await;
        proxy.state.touch_session_binding(id, started_at_ms).await;
    }

    cwd
}