1use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25fn merge_history_messages(
26 mut history: Vec<ChatMessage>,
27 current_turn_messages: Vec<ChatMessage>,
28) -> Vec<ChatMessage> {
29 let mut overlap = 0usize;
32 while overlap < history.len() && overlap < current_turn_messages.len() {
33 let same = serde_json::to_value(&history[overlap]).ok()
34 == serde_json::to_value(¤t_turn_messages[overlap]).ok();
35 if !same {
36 break;
37 }
38 overlap += 1;
39 }
40
41 if overlap > 0 {
42 debug!(
43 "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
44 overlap
45 );
46 } else if !current_turn_messages.is_empty() {
47 debug!(
48 "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
49 );
50 }
51
52 history.extend(current_turn_messages.into_iter().skip(overlap));
53 history
54}
55
56#[async_trait]
57impl ProxyHttp for CodexProxy {
58 type CTX = ProxyContext;
59
60 fn new_ctx(&self) -> Self::CTX {
61 ProxyContext::new()
62 }
63
64 async fn request_filter(
66 &self,
67 session: &mut Session,
68 ctx: &mut Self::CTX,
69 ) -> PingoraResult<bool>
70 where
71 Self::CTX: Send + Sync,
72 {
73 let method = session.req_header().method.as_str().to_string();
74 let path = session.req_header().uri.path().to_string();
75
76 let headers: Vec<(String, String)> = session
78 .req_header()
79 .headers
80 .iter()
81 .map(|(name, value)| {
82 (
83 name.as_str().to_string(),
84 value.to_str().unwrap_or("<binary>").to_string(),
85 )
86 })
87 .collect();
88
89 let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
91 Some(pair) => pair,
92 None => {
93 warn!("[REQUEST] No matching backend for path: {}", path);
94 return Err(pingora_core::Error::new_str("No matching backend"));
95 }
96 };
97
98 let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
99 let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
100 if stripped.is_empty() {
101 "/".to_string()
102 } else if stripped.starts_with('/') {
103 stripped.to_string()
104 } else {
105 format!("/{}", stripped)
106 }
107 } else {
108 path.clone()
109 };
110 ctx.normalized_path = Some(normalized_path.clone());
111
112 let is_conversion = normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses");
114 ctx.is_conversion_request = is_conversion;
115
116 if is_conversion {
117 debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
118 }
119
120 ctx.selected_backend = Some(backend.clone());
121 ctx.provider_name = Some(backend.name.clone());
122
123 debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
124
125 Ok(false)
127 }
128
129 async fn upstream_peer(
131 &self,
132 _session: &mut Session,
133 ctx: &mut Self::CTX,
134 ) -> PingoraResult<Box<HttpPeer>> {
135 let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
136 error!("No backend selected");
137 pingora_core::Error::new_str("No backend selected")
138 })?;
139
140 let mut peer = HttpPeer::new(
141 (backend.host.as_str(), backend.port),
142 backend.use_tls,
143 backend.host.clone(),
144 );
145
146 peer.options.alpn = ALPN::H2H1;
148
149 peer.options.connection_timeout = Some(Duration::from_secs(10));
151 peer.options.total_connection_timeout = Some(Duration::from_secs(30));
152 peer.options.idle_timeout = Some(Duration::from_secs(90));
153 peer.options.tcp_keepalive = Some(TcpKeepalive {
154 idle: Duration::from_secs(60),
155 interval: Duration::from_secs(5),
156 count: 5,
157 });
158
159 if backend.use_tls {
160 peer.options.h2_ping_interval = Some(Duration::from_secs(30));
161 }
162
163 Ok(Box::new(peer))
164 }
165
166 async fn upstream_request_filter(
168 &self,
169 session: &mut Session,
170 upstream_request: &mut RequestHeader,
171 ctx: &mut Self::CTX,
172 ) -> PingoraResult<()> {
173 let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
174 error!("No backend selected");
175 pingora_core::Error::new_str("No backend selected")
176 })?;
177
178 let original_uri = session.req_header().uri.clone();
179 let path = original_uri.path().to_string();
180 let query = original_uri.query();
181 let normalized_path = ctx.normalized_path.as_deref().unwrap_or(path.as_str());
182
183 let is_conversion_request = normalized_path.starts_with("/v1/responses")
185 || normalized_path.starts_with("/responses");
186
187 let chat_api_path = if normalized_path.starts_with("/v1/responses")
190 || normalized_path.starts_with("/responses")
191 || normalized_path.starts_with("/v1/chat/completions")
192 {
193 if let Some(provider) = self.get_provider(&backend.name) {
195 provider.chat_completions_path()
196 } else {
197 "/v1/chat/completions".to_string()
199 }
200 } else {
201 normalized_path.to_string()
202 };
203
204 let new_path = if !backend.base_path.is_empty() {
206 format!("{}{}", backend.base_path, chat_api_path)
207 } else {
208 chat_api_path
209 };
210
211 let new_uri_str = if let Some(q) = query {
213 format!("{}?{}", new_path, q)
214 } else {
215 new_path.clone()
216 };
217
218 let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
219 error!("URI rewrite failed: {}", e);
220 pingora_core::Error::new_str("URI rewrite failed")
221 })?;
222 upstream_request.set_uri(new_uri);
223 ctx.rewritten_path = Some(new_path);
224
225 upstream_request.remove_header("x-api-key");
227 upstream_request.remove_header("authorization");
228
229 if is_conversion_request {
232 upstream_request.remove_header("content-length");
233 debug!("[UPSTREAM] Removed content-length for body transformation");
234 }
235
236 upstream_request
238 .insert_header("authorization", format!("Bearer {}", backend.api_key))
239 .map_err(|e| {
240 error!("Failed to inject authorization header: {}", e);
241 pingora_core::Error::new_str("Header injection failed")
242 })?;
243
244 upstream_request
246 .insert_header("host", &backend.host)
247 .map_err(|e| {
248 error!("Failed to set host header: {}", e);
249 pingora_core::Error::new_str("Host header failed")
250 })?;
251
252 debug!(
253 "[UPSTREAM] {} {} -> {}",
254 upstream_request.method.as_str(),
255 upstream_request.uri,
256 backend.name
257 );
258
259 Ok(())
260 }
261
262 async fn request_body_filter(
268 &self,
269 _session: &mut Session,
270 body: &mut Option<Bytes>,
271 end_of_stream: bool,
272 ctx: &mut Self::CTX,
273 ) -> PingoraResult<()>
274 where
275 Self::CTX: Send + Sync,
276 {
277 if ctx.is_conversion_request {
280 if let Some(b) = body {
282 if ctx.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
283 error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
284 return Err(pingora_core::Error::new_str("Request body too large"));
285 }
286 ctx.request_body.extend_from_slice(b);
287 debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.request_body.len());
288 }
289
290 *body = Some(Bytes::new());
293
294 if end_of_stream {
296 debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.request_body.len());
297
298 let backend_name = ctx.selected_backend.as_ref().map(|b| b.name.clone());
299
300 if let (Some(b_name), Some(_backend)) = (backend_name.as_ref(), ctx.selected_backend.as_ref())
302 && let Some(provider) = self.get_provider(b_name) {
303 let model_override = ctx.selected_backend.as_ref()
305 .and_then(|b| b.model.as_deref())
306 .map(|s| s.to_string());
307 match serde_json::from_slice::<ResponseRequest>(&ctx.request_body) {
309 Ok(mut response_req) => {
310 let mut previous_messages: Option<Vec<ChatMessage>> = None;
311 if let Some(prev_id) = response_req.previous_response_id.clone() {
312 if let Some(snapshot) = self.get_conversation(&prev_id) {
313 if matches!(
314 &response_req.input,
315 crate::types::response_api::InputItemOrString::Array(_)
316 ) {
317 debug!(
318 "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
319 );
320 }
321 if response_req.instructions.is_none() {
322 response_req.instructions = snapshot.instructions.clone();
323 }
324 previous_messages = Some(snapshot.messages);
325 } else {
326 warn!(
327 "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
328 prev_id
329 );
330 }
331 }
332 let context = ResponseRequestContext::from(&response_req);
333 ctx.set_response_request_context(context);
335 match response_to_chat(response_req, provider.as_ref(), model_override.as_deref()) {
337 Ok(mut chat_req) => {
338 if let Some(history) = previous_messages {
339 chat_req.messages = merge_history_messages(history, chat_req.messages);
340 }
341 ctx.pending_instructions = chat_req
342 .messages
343 .iter()
344 .find(|m| m.role == MessageRole::System)
345 .map(|m| m.content.as_text());
346 ctx.pending_conversation_messages = Some(chat_req.messages.clone());
347 match serde_json::to_vec(&chat_req) {
349 Ok(converted) => {
350 let converted_len = converted.len();
351 if self.log_body {
353 debug!("[CONVERTED REQUEST] {}", String::from_utf8_lossy(&converted));
354 }
355 let path = self.log_dir.join("converted_request.json");
357 if std::fs::write(&path, &converted).is_ok() {
358 debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
359 }
360 *body = Some(Bytes::from(converted));
362 debug!("[BODY] Sending converted body: {} bytes", converted_len);
363 }
364 Err(e) => {
365 error!("Failed to serialize ChatRequest: {}", e);
366 *body = Some(Bytes::from(ctx.request_body.clone()));
368 }
369 }
370 }
371 Err(e) => {
372 error!("Failed to convert request: {}", e);
373 *body = Some(Bytes::from(ctx.request_body.clone()));
375 }
376 }
377 }
378 Err(e) => {
379 debug!("Failed to parse as ResponseRequest, keeping original: {}", e);
381 *body = Some(Bytes::from(ctx.request_body.clone()));
383 let path = self.log_dir.join("codex_request_body.json");
385 if std::fs::write(&path, &ctx.request_body).is_ok() {
386 debug!("[REQUEST BODY SAVED] to {}", path.display());
387 }
388 }
389 }
390 }
391
392 ctx.init_from_request_body();
394 }
395
396 return Ok(());
397 }
398
399 if let Some(b) = body {
401 ctx.request_body.extend_from_slice(b);
402 }
403
404 if end_of_stream && !ctx.request_body.is_empty() {
406 ctx.init_from_request_body();
408 }
409
410 Ok(())
411 }
412
413 async fn response_filter(
415 &self,
416 _session: &mut Session,
417 upstream_response: &mut ResponseHeader,
418 ctx: &mut Self::CTX,
419 ) -> PingoraResult<()> {
420 let status = upstream_response.status.as_u16();
421 let content_type = upstream_response
422 .headers
423 .get("content-type")
424 .and_then(|v| v.to_str().ok())
425 .unwrap_or("")
426 .to_string();
427 let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
428 let is_success = (200..300).contains(&status);
429
430 ctx.upstream_status = Some(status);
431 ctx.upstream_content_type = Some(content_type.clone());
432 ctx.should_convert_stream_response =
433 ctx.is_streaming && ctx.is_conversion_request && is_success && is_sse;
434
435 if ctx.is_conversion_request {
436 upstream_response.remove_header("content-length");
437 debug!(
438 "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
439 status,
440 content_type
441 );
442 }
443
444 if ctx.is_streaming && ctx.is_conversion_request && !ctx.should_convert_stream_response {
445 warn!(
446 "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
447 status,
448 content_type,
449 if !is_success {
450 "upstream_non_2xx"
451 } else if !is_sse {
452 "upstream_not_sse"
453 } else {
454 "unknown"
455 }
456 );
457 }
458
459 debug!(
460 "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
461 status,
462 ctx.is_streaming,
463 ctx.is_conversion_request,
464 ctx.should_convert_stream_response
465 );
466 Ok(())
467 }
468
469 fn response_body_filter(
471 &self,
472 _session: &mut Session,
473 body: &mut Option<Bytes>,
474 end_of_body: bool,
475 ctx: &mut Self::CTX,
476 ) -> PingoraResult<Option<Duration>>
477 where
478 Self::CTX: Send + Sync,
479 {
480 let body_clone = body.clone();
482
483 debug!(
484 "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
485 body_clone.as_ref().map(|b| b.len()),
486 end_of_body,
487 ctx.is_streaming,
488 ctx.is_conversion_request
489 );
490
491 if let Some(b) = body_clone.as_ref() {
492 if !ctx.is_streaming
496 && ctx.is_conversion_request
497 && ctx.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
498 {
499 warn!(
500 "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
501 MAX_RESPONSE_BODY_SIZE
502 );
503 } else {
504 ctx.response_body.extend_from_slice(b);
505 }
506
507 if !ctx.is_streaming && ctx.is_conversion_request && !end_of_body {
510 *body = Some(Bytes::new());
511 }
512
513 if ctx.should_convert_stream_response {
515 *body = Some(Bytes::new());
518
519 let provider = ctx.provider_name.as_ref()
521 .and_then(|name| self.get_provider(name));
522
523 let mut handler = StreamingResponseHandler::new(
525 ctx,
526 provider,
527 self.log_body,
528 self.conversation_store.clone(),
529 );
530
531 if let Some(converted) = handler.process_stream_frame() {
533 *body = Some(Bytes::from(converted));
534 }
535
536 if end_of_body {
538 let completed_events = handler.finalize_stream();
539 if !completed_events.is_empty() {
540 let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
541 .unwrap_or("")
542 .to_string();
543 let combined = format!("{}{}", existing, completed_events.join(""));
544 *body = Some(Bytes::from(combined));
545 }
546 }
547 }
548 }
549
550 if end_of_body {
551 let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
552
553 if !ctx.is_streaming && ctx.is_conversion_request && !ctx.response_body.is_empty()
555 && let Ok(text) = std::str::from_utf8(&ctx.response_body)
556 && let Ok(chat_resp) = serde_json::from_str::<crate::types::chat_api::ChatResponse>(text) {
557 let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
558 let request_context = ctx.stream_state.as_ref()
560 .and_then(|s| s.request_context.as_ref());
561 match crate::convert::chat_to_response_with_context(
562 chat_resp,
563 request_context,
564 ) {
565 Ok(response_obj) => {
566 if let Ok(converted) = serde_json::to_vec(&response_obj) {
567 if self.log_body {
568 debug!("[CONVERTED RESPONSE] {}", String::from_utf8_lossy(&converted));
569 }
570 *body = Some(Bytes::from(converted));
571 }
572 if let (Some(mut messages), Some(assistant_message)) = (
573 ctx.pending_conversation_messages.clone(),
574 assistant_message,
575 ) {
576 messages.push(assistant_message);
577 self.store_conversation(
578 response_obj.id.clone(),
579 ConversationSnapshot {
580 instructions: ctx.pending_instructions.clone(),
581 messages,
582 },
583 );
584 }
585 }
586 Err(e) => {
587 error!("Failed to convert response: {}", e);
588 }
589 }
590 }
591
592 info!(
593 "[DONE] provider={}, model={:?}, duration={}ms",
594 ctx.provider_name.as_deref().unwrap_or("unknown"),
595 ctx.model.as_ref(),
596 duration_ms
597 );
598 }
599
600 Ok(None)
601 }
602
603 async fn connected_to_upstream(
605 &self,
606 _session: &mut Session,
607 reused: bool,
608 peer: &HttpPeer,
609 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
610 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
611 digest: Option<&Digest>,
612 ctx: &mut Self::CTX,
613 ) -> PingoraResult<()> {
614 let tls_version = digest
615 .and_then(|d| d.ssl_digest.as_ref())
616 .map(|ssl| ssl.version.to_string())
617 .unwrap_or_else(|| "none".to_string());
618
619 let use_tls = ctx.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
620 let backend_name = ctx.provider_name.as_deref().unwrap_or("unknown");
621
622 info!(
623 "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
624 peer.sni(),
625 peer.address(),
626 backend_name,
627 use_tls,
628 reused,
629 tls_version
630 );
631
632 Ok(())
633 }
634
635 fn error_while_proxy(
637 &self,
638 peer: &HttpPeer,
639 _session: &mut Session,
640 e: Box<pingora_core::Error>,
641 ctx: &mut Self::CTX,
642 _client_reused: bool,
643 ) -> Box<pingora_core::Error> {
644 error!(
645 "[ERROR] proxy error to {}: {}",
646 peer.address(),
647 e
648 );
649
650 let mut e = e.more_context(format!("Provider: {}", ctx.provider_name.as_deref().unwrap_or("unknown")));
651 e.retry.decide_reuse(false);
652 e
653 }
654
655 async fn fail_to_proxy(
657 &self,
658 session: &mut Session,
659 e: &pingora_core::Error,
660 ctx: &mut Self::CTX,
661 ) -> pingora_proxy::FailToProxy
662 where
663 Self::CTX: Send + Sync,
664 {
665 let code = match *e.etype() {
666 pingora_core::ErrorType::ConnectTimedout => 504,
667 pingora_core::ErrorType::ConnectRefused => 502,
668 pingora_core::ErrorType::TLSHandshakeFailure => 502,
669 _ => 502,
670 };
671
672 let method = session.req_header().method.as_str();
673 let uri = &session.req_header().uri;
674
675 error!(
676 "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
677 method,
678 uri,
679 code,
680 ctx.provider_name.as_deref().unwrap_or("unknown"),
681 ctx.model.as_ref(),
682 e
683 );
684
685 let error_body = serde_json::json!({
687 "error": {
688 "type": "proxy_error",
689 "code": code,
690 "message": e.to_string()
691 }
692 })
693 .to_string();
694 if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
695 let _ = resp.insert_header("content-type", "application/json");
696 let _ = resp.insert_header("content-length", error_body.len().to_string());
697 let _ = session.write_response_header(Box::new(resp), false).await;
698 let _ = session
699 .write_response_body(Some(bytes::Bytes::from(error_body)), true)
700 .await;
701 }
702
703 pingora_proxy::FailToProxy {
704 error_code: code,
705 can_reuse_downstream: false,
706 }
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use std::collections::HashMap;
713 use std::sync::Arc;
714
715 use crate::config::{BackendConfig, MatchRules};
716 use crate::types::chat_api::{ChatMessage, Content, MessageRole};
717 use crate::providers::GLMProvider;
718 use crate::providers::Provider;
719
720 use super::CodexProxy;
721
722 #[test]
723 fn test_proxy_creation() {
724 let configs = vec![BackendConfig {
725 name: "glm".to_string(),
726 url: "https://api.example.com".to_string(),
727 api_key: "test-key".to_string(),
728 protocol: "openai".to_string(),
729 model: None,
730 match_rules: MatchRules {
731 default: true,
732 ..Default::default()
733 },
734 }];
735 let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
736 let mut providers = HashMap::new();
737 providers.insert("glm".to_string(), Box::new(GLMProvider) as Box<dyn Provider + Send + Sync>);
738 let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
739 assert!(proxy.log_body);
740 }
741
742 fn msg(role: MessageRole, text: &str) -> ChatMessage {
743 ChatMessage {
744 role,
745 content: Content::String(text.to_string()),
746 name: None,
747 annotations: None,
748 tool_calls: None,
749 function_call: None,
750 tool_call_id: None,
751 refusal: None,
752 }
753 }
754
755 #[test]
756 fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
757 let history = vec![
758 msg(MessageRole::System, "You are helpful"),
759 msg(MessageRole::User, "hello"),
760 msg(MessageRole::Assistant, "hi"),
761 ];
762 let current = vec![
763 msg(MessageRole::System, "You are helpful"),
764 msg(MessageRole::User, "hello"),
765 msg(MessageRole::Assistant, "hi"),
766 msg(MessageRole::User, "next question"),
767 ];
768
769 let merged = super::merge_history_messages(history, current);
770 assert_eq!(merged.len(), 4);
771 assert_eq!(merged[3].content.as_text(), "next question");
772 }
773
774 #[test]
775 fn test_merge_history_messages_when_no_overlap_appends_all_current() {
776 let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
777 let current = vec![msg(MessageRole::User, "new question")];
778
779 let merged = super::merge_history_messages(history, current);
780 assert_eq!(merged.len(), 3);
781 assert_eq!(merged[2].content.as_text(), "new question");
782 }
783}