use super::*;
pub(super) fn should_skip_runtime_response_header(name: &str) -> bool {
matches!(
name.to_ascii_lowercase().as_str(),
"connection"
| "content-encoding"
| "content-length"
| "date"
| "server"
| "transfer-encoding"
)
}
pub(super) fn forward_runtime_proxy_response(
shared: &RuntimeRotationProxyShared,
response: reqwest::Response,
prelude: Vec<u8>,
) -> Result<tiny_http::ResponseBox> {
let parts = buffer_runtime_proxy_async_response_parts(shared, response, prelude)?;
Ok(build_runtime_proxy_response_from_parts(parts))
}
pub(super) fn forward_runtime_proxy_response_with_limit(
shared: &RuntimeRotationProxyShared,
response: reqwest::Response,
prelude: Vec<u8>,
max_bytes: usize,
) -> Result<tiny_http::ResponseBox> {
let parts =
buffer_runtime_proxy_async_response_parts_with_limit(shared, response, prelude, max_bytes)?;
Ok(build_runtime_proxy_response_from_parts(parts))
}
pub(super) struct RuntimeResponsesSuccessContext<'a> {
pub(super) request_id: u64,
pub(super) request_previous_response_id: Option<&'a str>,
pub(super) request_session_id: Option<&'a str>,
pub(super) request_turn_state: Option<&'a str>,
pub(super) turn_state_override: Option<&'a str>,
pub(super) shared: &'a RuntimeRotationProxyShared,
pub(super) profile_name: &'a str,
pub(super) inflight_guard: RuntimeProfileInFlightGuard,
}
pub(super) fn prepare_runtime_proxy_responses_success(
context: RuntimeResponsesSuccessContext<'_>,
response: reqwest::Response,
) -> Result<RuntimeResponsesAttempt> {
let RuntimeResponsesSuccessContext {
request_id,
request_previous_response_id,
request_session_id,
request_turn_state,
turn_state_override,
shared,
profile_name,
inflight_guard,
} = context;
let response_header_turn_state =
runtime_proxy_header_value(response.headers(), "x-codex-turn-state");
remember_runtime_successful_previous_response_owner(
shared,
profile_name,
request_previous_response_id,
RuntimeRouteKind::Responses,
)?;
remember_runtime_session_id(
shared,
profile_name,
request_session_id,
RuntimeRouteKind::Responses,
)?;
let is_sse = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value.contains("text/event-stream"));
runtime_proxy_log(
shared,
format!(
"request={request_id} transport=http prepare_success profile={profile_name} sse={is_sse} turn_state={:?}",
response_header_turn_state
),
);
if !is_sse {
let buffered_started_at = Instant::now();
let parts = buffer_runtime_proxy_async_response_parts(shared, response, Vec::new())?;
let response_turn_state = response_header_turn_state
.or_else(|| turn_state_override.map(str::to_string))
.or_else(|| extract_runtime_turn_state_from_body_bytes(&parts.body));
remember_runtime_turn_state(
shared,
profile_name,
response_turn_state.as_deref(),
RuntimeRouteKind::Responses,
)?;
runtime_proxy_log(
shared,
format!(
"request={request_id} transport=http buffered_response_complete profile={profile_name} phase=responses_unary status={} content_type={} body_bytes={} elapsed_ms={}",
parts.status,
runtime_buffered_response_content_type(&parts).unwrap_or("-"),
parts.body.len(),
buffered_started_at.elapsed().as_millis(),
),
);
let response_ids = extract_runtime_response_ids_from_body_bytes(&parts.body);
if !response_ids.is_empty() {
remember_runtime_response_ids_with_turn_state(
shared,
profile_name,
&response_ids,
response_turn_state.as_deref(),
RuntimeRouteKind::Responses,
)?;
let _ = release_runtime_compact_lineage(
shared,
profile_name,
request_session_id,
request_turn_state,
"response_committed",
);
}
return Ok(RuntimeResponsesAttempt::Success {
profile_name: profile_name.to_string(),
response: RuntimeResponsesReply::Buffered(parts),
});
}
let status = response.status().as_u16();
let mut headers = Vec::new();
for (name, value) in response.headers() {
if should_skip_runtime_response_header(name.as_str()) {
continue;
}
if let Ok(value) = value.to_str() {
headers.push((name.to_string(), value.to_string()));
}
}
let mut prefetch = RuntimePrefetchStream::spawn(
response,
Arc::clone(&shared.async_runtime),
shared.log_path.clone(),
request_id,
);
let lookahead = inspect_runtime_sse_lookahead(&mut prefetch, &shared.log_path, request_id)?;
let (prelude, response_ids, lookahead_turn_state) = match lookahead {
RuntimeSseInspection::Commit {
prelude,
response_ids,
turn_state,
} => {
runtime_proxy_log(
shared,
format!(
"request={request_id} transport=http sse_commit profile={profile_name} prelude_bytes={} response_ids={}",
prelude.len(),
response_ids.len()
),
);
(prelude, response_ids, turn_state)
}
RuntimeSseInspection::QuotaBlocked(prelude) => {
runtime_proxy_log(
shared,
format!(
"request={request_id} transport=http sse_quota_blocked profile={profile_name} prelude_bytes={}",
prelude.len()
),
);
return Ok(RuntimeResponsesAttempt::QuotaBlocked {
profile_name: profile_name.to_string(),
response: RuntimeResponsesReply::Streaming(RuntimeStreamingResponse {
status,
headers: headers.clone(),
body: Box::new(prefetch.into_reader(prelude)),
request_id,
profile_name: profile_name.to_string(),
log_path: shared.log_path.clone(),
shared: shared.clone(),
_inflight_guard: Some(inflight_guard),
}),
});
}
RuntimeSseInspection::PreviousResponseNotFound(prelude) => {
runtime_proxy_log(
shared,
format!(
"request={request_id} transport=http route=responses previous_response_not_found profile={profile_name} stage=sse_prelude prelude_bytes={}",
prelude.len()
),
);
return Ok(RuntimeResponsesAttempt::PreviousResponseNotFound {
profile_name: profile_name.to_string(),
response: RuntimeResponsesReply::Streaming(RuntimeStreamingResponse {
status,
headers: headers.clone(),
body: Box::new(prefetch.into_reader(prelude)),
request_id,
profile_name: profile_name.to_string(),
log_path: shared.log_path.clone(),
shared: shared.clone(),
_inflight_guard: Some(inflight_guard),
}),
turn_state: response_header_turn_state,
});
}
};
let response_turn_state = response_header_turn_state
.or_else(|| turn_state_override.map(str::to_string))
.or(lookahead_turn_state);
remember_runtime_turn_state(
shared,
profile_name,
response_turn_state.as_deref(),
RuntimeRouteKind::Responses,
)?;
remember_runtime_response_ids_with_turn_state(
shared,
profile_name,
&response_ids,
response_turn_state.as_deref(),
RuntimeRouteKind::Responses,
)?;
if !response_ids.is_empty() {
let _ = release_runtime_compact_lineage(
shared,
profile_name,
request_session_id,
request_turn_state,
"response_committed",
);
}
Ok(RuntimeResponsesAttempt::Success {
profile_name: profile_name.to_string(),
response: RuntimeResponsesReply::Streaming(RuntimeStreamingResponse {
status,
headers,
body: Box::new(RuntimeSseTapReader::new(
prefetch.into_reader(prelude.clone()),
shared.clone(),
profile_name.to_string(),
&prelude,
&response_ids,
response_turn_state.as_deref(),
)),
request_id,
profile_name: profile_name.to_string(),
log_path: shared.log_path.clone(),
shared: shared.clone(),
_inflight_guard: Some(inflight_guard),
}),
})
}
impl RuntimeSseTapState {
fn observe(&mut self, shared: &RuntimeRotationProxyShared, profile_name: &str, chunk: &[u8]) {
for byte in chunk {
self.line.push(*byte);
if *byte != b'\n' {
continue;
}
let line_text = String::from_utf8_lossy(&self.line);
let trimmed = line_text.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() {
let event = parse_runtime_sse_event(&self.data_lines);
if let Some(turn_state) = event.turn_state {
self.turn_state = Some(turn_state);
}
self.remember_response_ids(
shared,
profile_name,
&event.response_ids,
RuntimeRouteKind::Responses,
);
self.data_lines.clear();
self.line.clear();
continue;
}
if let Some(payload) = trimmed.strip_prefix("data:") {
self.data_lines.push(payload.trim_start().to_string());
}
self.line.clear();
}
}
fn finish(&mut self, shared: &RuntimeRotationProxyShared, profile_name: &str) {
let event = parse_runtime_sse_event(&self.data_lines);
if let Some(turn_state) = event.turn_state {
self.turn_state = Some(turn_state);
}
self.remember_response_ids(
shared,
profile_name,
&event.response_ids,
RuntimeRouteKind::Responses,
);
}
fn remember_response_ids(
&mut self,
shared: &RuntimeRotationProxyShared,
profile_name: &str,
response_ids: &[String],
verified_route: RuntimeRouteKind,
) {
let fresh_ids = response_ids
.iter()
.filter(|response_id| self.remembered_response_ids.insert((*response_id).clone()))
.cloned()
.collect::<Vec<_>>();
let response_ids_needing_turn_state = self
.turn_state
.as_deref()
.map(|_| {
self.remembered_response_ids
.iter()
.filter(|response_id| {
self.response_ids_with_turn_state
.insert((*response_id).clone())
})
.cloned()
.collect::<Vec<_>>()
})
.unwrap_or_default();
if fresh_ids.is_empty() && response_ids_needing_turn_state.is_empty() {
return;
}
if !fresh_ids.is_empty() {
let _ = remember_runtime_response_ids_with_turn_state(
shared,
profile_name,
&fresh_ids,
self.turn_state.as_deref(),
verified_route,
);
}
if !response_ids_needing_turn_state.is_empty() {
let rebound_ids = response_ids_needing_turn_state
.into_iter()
.filter(|response_id| !fresh_ids.contains(response_id))
.collect::<Vec<_>>();
if !rebound_ids.is_empty() {
let _ = remember_runtime_response_ids_with_turn_state(
shared,
profile_name,
&rebound_ids,
self.turn_state.as_deref(),
verified_route,
);
}
}
}
}
pub(crate) struct RuntimeSseTapReader {
inner: Box<dyn Read + Send>,
shared: RuntimeRotationProxyShared,
profile_name: String,
state: RuntimeSseTapState,
}
impl Read for RuntimePrefetchReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.finished {
return Ok(0);
}
loop {
let read = self.pending.read(buf)?;
if read > 0 {
return Ok(read);
}
let next = if let Some(chunk) = self.backlog.pop_front() {
Some(chunk)
} else {
match self
.receiver
.recv_timeout(Duration::from_millis(runtime_proxy_stream_idle_timeout_ms()))
{
Ok(chunk) => {
if let RuntimePrefetchChunk::Data(bytes) = &chunk {
runtime_prefetch_release_queued_bytes(&self.shared, bytes.len());
}
Some(chunk)
}
Err(RecvTimeoutError::Timeout) => {
self.finished = true;
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"runtime upstream stream idle timed out",
));
}
Err(RecvTimeoutError::Disconnected) => {
if let Some((kind, message)) = runtime_prefetch_terminal_error(&self.shared)
{
self.finished = true;
return Err(io::Error::new(kind, message));
}
None
}
}
};
match next {
Some(RuntimePrefetchChunk::Data(chunk)) => {
self.pending = Cursor::new(chunk);
}
Some(RuntimePrefetchChunk::End) | None => {
self.finished = true;
return Ok(0);
}
Some(RuntimePrefetchChunk::Error(kind, message)) => {
self.finished = true;
return Err(io::Error::new(kind, message));
}
}
}
}
}
impl Drop for RuntimePrefetchReader {
fn drop(&mut self) {
self.worker_abort.abort();
}
}
impl RuntimeSseTapReader {
pub(crate) fn new(
inner: impl Read + Send + 'static,
shared: RuntimeRotationProxyShared,
profile_name: String,
prelude: &[u8],
remembered_response_ids: &[String],
turn_state: Option<&str>,
) -> Self {
let mut state = RuntimeSseTapState {
remembered_response_ids: remembered_response_ids.iter().cloned().collect(),
response_ids_with_turn_state: turn_state
.map(|_| remembered_response_ids.iter().cloned().collect())
.unwrap_or_default(),
turn_state: turn_state.map(str::to_string),
..RuntimeSseTapState::default()
};
state.observe(&shared, &profile_name, prelude);
Self {
inner: Box::new(inner),
shared,
profile_name,
state,
}
}
}
impl Read for RuntimeSseTapReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let read = match self.inner.read(buf) {
Ok(read) => read,
Err(err) => {
let transport_error =
anyhow::Error::new(io::Error::new(err.kind(), err.to_string()));
note_runtime_profile_transport_failure(
&self.shared,
&self.profile_name,
RuntimeRouteKind::Responses,
"sse_read",
&transport_error,
);
return Err(err);
}
};
if read == 0 {
self.state.finish(&self.shared, &self.profile_name);
return Ok(0);
}
self.state
.observe(&self.shared, &self.profile_name, &buf[..read]);
Ok(read)
}
}
pub(crate) fn write_runtime_streaming_response(
writer: Box<dyn Write + Send + 'static>,
mut response: RuntimeStreamingResponse,
) -> io::Result<()> {
let mut writer = writer;
let flush_each_chunk = response.headers.iter().any(|(name, value)| {
name.eq_ignore_ascii_case("content-type")
&& value.to_ascii_lowercase().contains("text/event-stream")
});
let started_at = Instant::now();
let log_writer_error = |stage: &str,
chunk_count: usize,
total_bytes: usize,
err: &io::Error| {
runtime_proxy_log_to_path(
&response.log_path,
&format!(
"local_writer_error request={} transport=http profile={} stage={} chunks={} bytes={} elapsed_ms={} error={}",
response.request_id,
response.profile_name,
stage,
chunk_count,
total_bytes,
started_at.elapsed().as_millis(),
err
),
);
};
runtime_proxy_log_to_path(
&response.log_path,
&format!(
"request={} transport=http stream_start profile={} status={}",
response.request_id, response.profile_name, response.status
),
);
let status = reqwest::StatusCode::from_u16(response.status)
.ok()
.and_then(|status| status.canonical_reason().map(str::to_string))
.unwrap_or_else(|| "OK".to_string());
write!(
writer,
"HTTP/1.1 {} {}\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n",
response.status, status
)
.map_err(|err| {
log_writer_error("headers_start", 0, 0, &err);
err
})?;
for (name, value) in response.headers {
write!(writer, "{name}: {value}\r\n").map_err(|err| {
log_writer_error("header_line", 0, 0, &err);
err
})?;
}
writer.write_all(b"\r\n").inspect_err(|err| {
log_writer_error("headers_end", 0, 0, err);
})?;
writer.flush().inspect_err(|err| {
log_writer_error("headers_flush", 0, 0, err);
})?;
let mut buffer = [0_u8; 8192];
let mut total_bytes = 0usize;
let mut chunk_count = 0usize;
loop {
let read = match response.body.read(&mut buffer) {
Ok(read) => read,
Err(err) => {
runtime_proxy_log_to_path(
&response.log_path,
&format!(
"request={} transport=http stream_read_error profile={} chunks={} bytes={} elapsed_ms={} error={}",
response.request_id,
response.profile_name,
chunk_count,
total_bytes,
started_at.elapsed().as_millis(),
err
),
);
let transport_error =
anyhow::Error::new(io::Error::new(err.kind(), err.to_string()));
if is_runtime_proxy_transport_failure(&transport_error) {
note_runtime_profile_latency_failure(
&response.shared,
&response.profile_name,
RuntimeRouteKind::Responses,
"stream_read_error",
);
}
return Err(err);
}
};
if read == 0 {
break;
}
if chunk_count == 0
&& runtime_take_fault_injection("PRODEX_RUNTIME_FAULT_STREAM_READ_ERROR_ONCE")
{
let err = io::Error::new(
io::ErrorKind::ConnectionReset,
"injected runtime stream read failure",
);
runtime_proxy_log_to_path(
&response.log_path,
&format!(
"request={} transport=http stream_read_error profile={} chunks={} bytes={} elapsed_ms={} error={}",
response.request_id,
response.profile_name,
chunk_count,
total_bytes,
started_at.elapsed().as_millis(),
err
),
);
note_runtime_profile_latency_failure(
&response.shared,
&response.profile_name,
RuntimeRouteKind::Responses,
"stream_read_error",
);
return Err(err);
}
chunk_count += 1;
total_bytes += read;
if chunk_count == 1 {
runtime_proxy_log_to_path(
&response.log_path,
&format!(
"request={} transport=http first_local_chunk profile={} bytes={} elapsed_ms={}",
response.request_id,
response.profile_name,
read,
started_at.elapsed().as_millis()
),
);
note_runtime_profile_latency_observation(
&response.shared,
&response.profile_name,
RuntimeRouteKind::Responses,
"ttfb",
started_at.elapsed().as_millis() as u64,
);
}
write!(writer, "{:X}\r\n", read).map_err(|err| {
log_writer_error("chunk_size", chunk_count, total_bytes, &err);
err
})?;
writer.write_all(&buffer[..read]).inspect_err(|err| {
log_writer_error("chunk_body", chunk_count, total_bytes, err);
})?;
writer.write_all(b"\r\n").inspect_err(|err| {
log_writer_error("chunk_suffix", chunk_count, total_bytes, err);
})?;
if flush_each_chunk || chunk_count == 1 {
writer.flush().inspect_err(|err| {
log_writer_error("chunk_flush", chunk_count, total_bytes, err);
})?;
}
}
writer.write_all(b"0\r\n\r\n").inspect_err(|err| {
log_writer_error("trailer", chunk_count, total_bytes, err);
})?;
writer.flush().inspect_err(|err| {
log_writer_error("trailer_flush", chunk_count, total_bytes, err);
})?;
runtime_proxy_log_to_path(
&response.log_path,
&format!(
"request={} transport=http stream_complete profile={} chunks={} bytes={} elapsed_ms={}",
response.request_id,
response.profile_name,
chunk_count,
total_bytes,
started_at.elapsed().as_millis()
),
);
note_runtime_profile_latency_observation(
&response.shared,
&response.profile_name,
RuntimeRouteKind::Responses,
"stream_complete",
started_at.elapsed().as_millis() as u64,
);
Ok(())
}
pub(super) fn runtime_proxy_header_value(
headers: &reqwest::header::HeaderMap,
name: &str,
) -> Option<String> {
headers
.get(name)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub(super) fn runtime_proxy_tungstenite_header_value(
headers: &tungstenite::http::HeaderMap,
name: &str,
) -> Option<String> {
headers
.get(name)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}