use std::time::Instant;
use axum::body::{Body, Bytes};
use axum::http::{HeaderMap, Method};
use axum::response::Response;
use crate::protocol::McpMethod;
use crate::proxy::ProxyState;
use crate::proxy::forwarding::build_response;
use crate::proxy::pipeline::context::RequestContext;
use crate::proxy::pipeline::emit::{ResponseSummary, emit_request_event};
use crate::proxy::pipeline::steps::{health, session};
use super::{capture_session_id, emit_upstream_error, forward_or_502, populate_client_info};
pub async fn forward_and_stream(
state: &ProxyState,
ctx: &mut RequestContext,
method: &McpMethod,
headers: &HeaderMap,
body: &Bytes,
) -> Response {
let upstream_url = state.mcp_upstream.trim_end_matches('/').to_string();
let upstream_start = Instant::now();
let resp = match forward_or_502(
&state.upstream,
&upstream_url,
Method::POST,
headers,
body,
false,
)
.await
{
Ok(r) => r,
Err(e) => return emit_upstream_error(state, ctx, upstream_start, e),
};
let status = resp.status().as_u16();
let upstream_headers = resp.headers().clone();
let upstream_us = upstream_start.elapsed().as_micros() as u64;
capture_session_id(ctx, &upstream_headers);
health::track_post_response(state, method, status);
session::maybe_record_start(state, ctx, method, status).await;
populate_client_info(state, ctx).await;
ctx.tags.push("streamed");
emit_request_event(
state,
ctx,
&ResponseSummary {
status,
response_size: None,
upstream_us: Some(upstream_us),
error_code: None,
error_msg: None,
stage_timings: None,
},
);
build_response(
status,
&upstream_headers,
Body::from_stream(resp.bytes_stream()),
)
}