1use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25fn merge_history_messages(
26 mut history: Vec<ChatMessage>,
27 current_turn_messages: Vec<ChatMessage>,
28) -> Vec<ChatMessage> {
29 let mut overlap = 0usize;
32 while overlap < history.len() && overlap < current_turn_messages.len() {
33 let same = serde_json::to_value(&history[overlap]).ok()
34 == serde_json::to_value(¤t_turn_messages[overlap]).ok();
35 if !same {
36 break;
37 }
38 overlap += 1;
39 }
40
41 if overlap > 0 {
42 debug!(
43 "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
44 overlap
45 );
46 } else if !current_turn_messages.is_empty() {
47 debug!(
48 "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
49 );
50 }
51
52 history.extend(current_turn_messages.into_iter().skip(overlap));
53 history
54}
55
56#[async_trait]
57impl ProxyHttp for CodexProxy {
58 type CTX = ProxyContext;
59
60 fn new_ctx(&self) -> Self::CTX {
61 ProxyContext::new()
62 }
63
64 async fn request_filter(
66 &self,
67 session: &mut Session,
68 ctx: &mut Self::CTX,
69 ) -> PingoraResult<bool>
70 where
71 Self::CTX: Send + Sync,
72 {
73 let method = session.req_header().method.as_str().to_string();
74 let path = session.req_header().uri.path().to_string();
75
76 let headers: Vec<(String, String)> = session
78 .req_header()
79 .headers
80 .iter()
81 .map(|(name, value)| {
82 (
83 name.as_str().to_string(),
84 value.to_str().unwrap_or("<binary>").to_string(),
85 )
86 })
87 .collect();
88
89 let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
91 Some(pair) => pair,
92 None => {
93 warn!("[REQUEST] No matching backend for path: {}", path);
94 return Err(pingora_core::Error::new_str("No matching backend"));
95 }
96 };
97
98 let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
99 let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
100 if stripped.is_empty() {
101 "/".to_string()
102 } else if stripped.starts_with('/') {
103 stripped.to_string()
104 } else {
105 format!("/{}", stripped)
106 }
107 } else {
108 path.clone()
109 };
110 ctx.normalized_path = Some(normalized_path.clone());
111
112 let is_conversion = (normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses")) && method == "POST";
114 ctx.is_conversion_request = is_conversion;
115
116 if is_conversion {
117 debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
118 }
119
120 ctx.selected_backend = Some(backend.clone());
121 ctx.provider_name = Some(backend.name.clone());
122
123 debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
124
125 Ok(false)
127 }
128
129 async fn upstream_peer(
131 &self,
132 _session: &mut Session,
133 ctx: &mut Self::CTX,
134 ) -> PingoraResult<Box<HttpPeer>> {
135 let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
136 error!("No backend selected");
137 pingora_core::Error::new_str("No backend selected")
138 })?;
139
140 let mut peer = HttpPeer::new(
141 (backend.host.as_str(), backend.port),
142 backend.use_tls,
143 backend.host.clone(),
144 );
145
146 peer.options.alpn = ALPN::H2H1;
148
149 peer.options.connection_timeout = Some(Duration::from_secs(10));
151 peer.options.total_connection_timeout = Some(Duration::from_secs(30));
152 peer.options.idle_timeout = Some(Duration::from_secs(90));
153 peer.options.tcp_keepalive = Some(TcpKeepalive {
154 idle: Duration::from_secs(60),
155 interval: Duration::from_secs(5),
156 count: 5,
157 });
158
159 if backend.use_tls {
160 peer.options.h2_ping_interval = Some(Duration::from_secs(30));
161 }
162
163 Ok(Box::new(peer))
164 }
165
166 async fn upstream_request_filter(
168 &self,
169 session: &mut Session,
170 upstream_request: &mut RequestHeader,
171 ctx: &mut Self::CTX,
172 ) -> PingoraResult<()> {
173 let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
174 error!("No backend selected");
175 pingora_core::Error::new_str("No backend selected")
176 })?;
177
178 let original_uri = session.req_header().uri.clone();
179 let path = original_uri.path().to_string();
180 let query = original_uri.query();
181 let normalized_path = ctx.normalized_path.as_deref().unwrap_or(path.as_str());
182
183 let is_conversion_request = (normalized_path.starts_with("/v1/responses")
185 || normalized_path.starts_with("/responses"))
186 && upstream_request.method.as_str() == "POST";
187
188 let chat_api_path = if is_conversion_request || normalized_path.starts_with("/v1/chat/completions") {
191 if let Some(provider) = self.get_provider(&backend.name) {
193 provider.chat_completions_path()
194 } else {
195 "/v1/chat/completions".to_string()
197 }
198 } else {
199 normalized_path.to_string()
200 };
201
202 let new_path = if !backend.base_path.is_empty() {
204 format!("{}{}", backend.base_path, chat_api_path)
205 } else {
206 chat_api_path
207 };
208
209 let new_uri_str = if let Some(q) = query {
211 format!("{}?{}", new_path, q)
212 } else {
213 new_path.clone()
214 };
215
216 let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
217 error!("URI rewrite failed: {}", e);
218 pingora_core::Error::new_str("URI rewrite failed")
219 })?;
220 upstream_request.set_uri(new_uri);
221 ctx.rewritten_path = Some(new_path);
222
223 upstream_request.remove_header("x-api-key");
225 upstream_request.remove_header("authorization");
226
227 if is_conversion_request {
230 upstream_request.remove_header("content-length");
231 debug!("[UPSTREAM] Removed content-length for body transformation");
232 }
233
234 upstream_request
236 .insert_header("authorization", format!("Bearer {}", backend.api_key))
237 .map_err(|e| {
238 error!("Failed to inject authorization header: {}", e);
239 pingora_core::Error::new_str("Header injection failed")
240 })?;
241
242 upstream_request
244 .insert_header("host", &backend.host)
245 .map_err(|e| {
246 error!("Failed to set host header: {}", e);
247 pingora_core::Error::new_str("Host header failed")
248 })?;
249
250 debug!(
251 "[UPSTREAM] {} {} -> {}",
252 upstream_request.method.as_str(),
253 upstream_request.uri,
254 backend.name
255 );
256
257 Ok(())
258 }
259
260 async fn request_body_filter(
266 &self,
267 _session: &mut Session,
268 body: &mut Option<Bytes>,
269 end_of_stream: bool,
270 ctx: &mut Self::CTX,
271 ) -> PingoraResult<()>
272 where
273 Self::CTX: Send + Sync,
274 {
275 if ctx.is_conversion_request {
278 if let Some(b) = body {
280 if ctx.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
281 error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
282 return Err(pingora_core::Error::new_str("Request body too large"));
283 }
284 ctx.request_body.extend_from_slice(b);
285 debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.request_body.len());
286 }
287
288 *body = Some(Bytes::new());
291
292 if end_of_stream {
294 debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.request_body.len());
295
296 let backend_name = ctx.selected_backend.as_ref().map(|b| b.name.clone());
297
298 if let (Some(b_name), Some(_backend)) = (backend_name.as_ref(), ctx.selected_backend.as_ref())
300 && let Some(provider) = self.get_provider(b_name) {
301 let model_override = ctx.selected_backend.as_ref()
303 .and_then(|b| b.model.as_deref())
304 .map(|s| s.to_string());
305 match serde_json::from_slice::<ResponseRequest>(&ctx.request_body) {
307 Ok(mut response_req) => {
308 let mut previous_messages: Option<Vec<ChatMessage>> = None;
309 if let Some(prev_id) = response_req.previous_response_id.clone() {
310 if let Some(snapshot) = self.get_conversation(&prev_id) {
311 if matches!(
312 &response_req.input,
313 crate::types::response_api::InputItemOrString::Array(_)
314 ) {
315 debug!(
316 "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
317 );
318 }
319 if response_req.instructions.is_none() {
320 response_req.instructions = snapshot.instructions.clone();
321 }
322 previous_messages = Some(snapshot.messages);
323 } else {
324 warn!(
325 "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
326 prev_id
327 );
328 }
329 }
330 let context = ResponseRequestContext::from(&response_req);
331 ctx.set_response_request_context(context);
333 match response_to_chat(response_req, provider.as_ref(), model_override.as_deref()) {
335 Ok(mut chat_req) => {
336 if let Some(history) = previous_messages {
337 chat_req.messages = merge_history_messages(history, chat_req.messages);
338 }
339 ctx.pending_instructions = chat_req
340 .messages
341 .iter()
342 .find(|m| m.role == MessageRole::System)
343 .map(|m| m.content.as_text());
344 ctx.pending_conversation_messages = Some(chat_req.messages.clone());
345 match serde_json::to_vec(&chat_req) {
347 Ok(converted) => {
348 let converted_len = converted.len();
349 if self.log_body {
351 debug!("[CONVERTED REQUEST] {}", String::from_utf8_lossy(&converted));
352 }
353 let path = self.log_dir.join("converted_request.json");
355 if std::fs::write(&path, &converted).is_ok() {
356 debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
357 }
358 *body = Some(Bytes::from(converted));
360 debug!("[BODY] Sending converted body: {} bytes", converted_len);
361 }
362 Err(e) => {
363 error!("Failed to serialize ChatRequest: {}", e);
364 *body = Some(Bytes::from(ctx.request_body.clone()));
366 }
367 }
368 }
369 Err(e) => {
370 error!("Failed to convert request: {}", e);
371 *body = Some(Bytes::from(ctx.request_body.clone()));
373 }
374 }
375 }
376 Err(e) => {
377 debug!("Failed to parse as ResponseRequest, keeping original: {}", e);
379 *body = Some(Bytes::from(ctx.request_body.clone()));
381 let path = self.log_dir.join("codex_request_body.json");
383 if std::fs::write(&path, &ctx.request_body).is_ok() {
384 debug!("[REQUEST BODY SAVED] to {}", path.display());
385 }
386 }
387 }
388 }
389
390 ctx.init_from_request_body();
392 }
393
394 return Ok(());
395 }
396
397 if let Some(b) = body {
399 ctx.request_body.extend_from_slice(b);
400 }
401
402 if end_of_stream && !ctx.request_body.is_empty() {
404 ctx.init_from_request_body();
406 }
407
408 Ok(())
409 }
410
411 async fn response_filter(
413 &self,
414 _session: &mut Session,
415 upstream_response: &mut ResponseHeader,
416 ctx: &mut Self::CTX,
417 ) -> PingoraResult<()> {
418 let status = upstream_response.status.as_u16();
419 let content_type = upstream_response
420 .headers
421 .get("content-type")
422 .and_then(|v| v.to_str().ok())
423 .unwrap_or("")
424 .to_string();
425 let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
426 let is_success = (200..300).contains(&status);
427
428 ctx.upstream_status = Some(status);
429 ctx.upstream_content_type = Some(content_type.clone());
430 ctx.should_convert_stream_response =
431 ctx.is_streaming && ctx.is_conversion_request && is_success && is_sse;
432
433 if ctx.is_conversion_request {
434 upstream_response.remove_header("content-length");
435 debug!(
436 "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
437 status,
438 content_type
439 );
440 }
441
442 if ctx.is_streaming && ctx.is_conversion_request && !ctx.should_convert_stream_response {
443 warn!(
444 "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
445 status,
446 content_type,
447 if !is_success {
448 "upstream_non_2xx"
449 } else if !is_sse {
450 "upstream_not_sse"
451 } else {
452 "unknown"
453 }
454 );
455 }
456
457 debug!(
458 "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
459 status,
460 ctx.is_streaming,
461 ctx.is_conversion_request,
462 ctx.should_convert_stream_response
463 );
464 Ok(())
465 }
466
467 fn response_body_filter(
469 &self,
470 _session: &mut Session,
471 body: &mut Option<Bytes>,
472 end_of_body: bool,
473 ctx: &mut Self::CTX,
474 ) -> PingoraResult<Option<Duration>>
475 where
476 Self::CTX: Send + Sync,
477 {
478 let body_clone = body.clone();
480
481 debug!(
482 "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
483 body_clone.as_ref().map(|b| b.len()),
484 end_of_body,
485 ctx.is_streaming,
486 ctx.is_conversion_request
487 );
488
489 if let Some(b) = body_clone.as_ref() {
490 if !ctx.is_streaming
494 && ctx.is_conversion_request
495 && ctx.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
496 {
497 warn!(
498 "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
499 MAX_RESPONSE_BODY_SIZE
500 );
501 } else {
502 ctx.response_body.extend_from_slice(b);
503 }
504
505 if !ctx.is_streaming && ctx.is_conversion_request && !end_of_body {
508 *body = Some(Bytes::new());
509 }
510
511 if ctx.should_convert_stream_response {
513 *body = Some(Bytes::new());
516
517 let provider = ctx.provider_name.as_ref()
519 .and_then(|name| self.get_provider(name));
520
521 let mut handler = StreamingResponseHandler::new(
523 ctx,
524 provider,
525 self.log_body,
526 self.conversation_store.clone(),
527 );
528
529 if let Some(converted) = handler.process_stream_frame() {
531 *body = Some(Bytes::from(converted));
532 }
533
534 if end_of_body {
536 let completed_events = handler.finalize_stream();
537 if !completed_events.is_empty() {
538 let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
539 .unwrap_or("")
540 .to_string();
541 let combined = format!("{}{}", existing, completed_events.join(""));
542 *body = Some(Bytes::from(combined));
543 }
544 }
545 }
546 }
547
548 if end_of_body {
549 let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
550
551 if !ctx.is_streaming && ctx.is_conversion_request && !ctx.response_body.is_empty()
553 && let Ok(text) = std::str::from_utf8(&ctx.response_body)
554 && let Ok(chat_resp) = serde_json::from_str::<crate::types::chat_api::ChatResponse>(text) {
555 let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
556 let request_context = ctx.stream_state.as_ref()
558 .and_then(|s| s.request_context.as_ref());
559 match crate::convert::chat_to_response_with_context(
560 chat_resp,
561 request_context,
562 ) {
563 Ok(response_obj) => {
564 if let Ok(converted) = serde_json::to_vec(&response_obj) {
565 if self.log_body {
566 debug!("[CONVERTED RESPONSE] {}", String::from_utf8_lossy(&converted));
567 }
568 *body = Some(Bytes::from(converted));
569 }
570 if let (Some(mut messages), Some(assistant_message)) = (
571 ctx.pending_conversation_messages.clone(),
572 assistant_message,
573 ) {
574 messages.push(assistant_message);
575 self.store_conversation(
576 response_obj.id.clone(),
577 ConversationSnapshot {
578 instructions: ctx.pending_instructions.clone(),
579 messages,
580 },
581 );
582 }
583 }
584 Err(e) => {
585 error!("Failed to convert response: {}", e);
586 }
587 }
588 }
589
590 info!(
591 "[DONE] provider={}, model={:?}, duration={}ms",
592 ctx.provider_name.as_deref().unwrap_or("unknown"),
593 ctx.model.as_ref(),
594 duration_ms
595 );
596 }
597
598 Ok(None)
599 }
600
601 async fn connected_to_upstream(
603 &self,
604 _session: &mut Session,
605 reused: bool,
606 peer: &HttpPeer,
607 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
608 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
609 digest: Option<&Digest>,
610 ctx: &mut Self::CTX,
611 ) -> PingoraResult<()> {
612 let tls_version = digest
613 .and_then(|d| d.ssl_digest.as_ref())
614 .map(|ssl| ssl.version.to_string())
615 .unwrap_or_else(|| "none".to_string());
616
617 let use_tls = ctx.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
618 let backend_name = ctx.provider_name.as_deref().unwrap_or("unknown");
619
620 info!(
621 "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
622 peer.sni(),
623 peer.address(),
624 backend_name,
625 use_tls,
626 reused,
627 tls_version
628 );
629
630 Ok(())
631 }
632
633 fn error_while_proxy(
635 &self,
636 peer: &HttpPeer,
637 _session: &mut Session,
638 e: Box<pingora_core::Error>,
639 ctx: &mut Self::CTX,
640 _client_reused: bool,
641 ) -> Box<pingora_core::Error> {
642 error!(
643 "[ERROR] proxy error to {}: {}",
644 peer.address(),
645 e
646 );
647
648 let mut e = e.more_context(format!("Provider: {}", ctx.provider_name.as_deref().unwrap_or("unknown")));
649 e.retry.decide_reuse(false);
650 e
651 }
652
653 async fn fail_to_proxy(
655 &self,
656 session: &mut Session,
657 e: &pingora_core::Error,
658 ctx: &mut Self::CTX,
659 ) -> pingora_proxy::FailToProxy
660 where
661 Self::CTX: Send + Sync,
662 {
663 let code = match *e.etype() {
664 pingora_core::ErrorType::ConnectTimedout => 504,
665 pingora_core::ErrorType::ConnectRefused => 502,
666 pingora_core::ErrorType::TLSHandshakeFailure => 502,
667 _ => 502,
668 };
669
670 let method = session.req_header().method.as_str();
671 let uri = &session.req_header().uri;
672
673 error!(
674 "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
675 method,
676 uri,
677 code,
678 ctx.provider_name.as_deref().unwrap_or("unknown"),
679 ctx.model.as_ref(),
680 e
681 );
682
683 let error_body = serde_json::json!({
685 "error": {
686 "type": "proxy_error",
687 "code": code,
688 "message": e.to_string()
689 }
690 })
691 .to_string();
692 if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
693 let _ = resp.insert_header("content-type", "application/json");
694 let _ = resp.insert_header("content-length", error_body.len().to_string());
695 let _ = session.write_response_header(Box::new(resp), false).await;
696 let _ = session
697 .write_response_body(Some(bytes::Bytes::from(error_body)), true)
698 .await;
699 }
700
701 pingora_proxy::FailToProxy {
702 error_code: code,
703 can_reuse_downstream: false,
704 }
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use std::collections::HashMap;
711 use std::sync::Arc;
712
713 use crate::config::{BackendConfig, MatchRules};
714 use crate::types::chat_api::{ChatMessage, Content, MessageRole};
715 use crate::providers::GLMProvider;
716 use crate::providers::Provider;
717
718 use super::CodexProxy;
719
720 #[test]
721 fn test_proxy_creation() {
722 let configs = vec![BackendConfig {
723 name: "glm".to_string(),
724 url: "https://api.example.com".to_string(),
725 api_key: "test-key".to_string(),
726 protocol: "openai".to_string(),
727 model: None,
728 match_rules: MatchRules {
729 default: true,
730 ..Default::default()
731 },
732 }];
733 let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
734 let mut providers = HashMap::new();
735 providers.insert("glm".to_string(), Box::new(GLMProvider) as Box<dyn Provider + Send + Sync>);
736 let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
737 assert!(proxy.log_body);
738 }
739
740 fn msg(role: MessageRole, text: &str) -> ChatMessage {
741 ChatMessage {
742 role,
743 content: Content::String(text.to_string()),
744 name: None,
745 annotations: None,
746 tool_calls: None,
747 function_call: None,
748 tool_call_id: None,
749 refusal: None,
750 }
751 }
752
753 #[test]
754 fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
755 let history = vec![
756 msg(MessageRole::System, "You are helpful"),
757 msg(MessageRole::User, "hello"),
758 msg(MessageRole::Assistant, "hi"),
759 ];
760 let current = vec![
761 msg(MessageRole::System, "You are helpful"),
762 msg(MessageRole::User, "hello"),
763 msg(MessageRole::Assistant, "hi"),
764 msg(MessageRole::User, "next question"),
765 ];
766
767 let merged = super::merge_history_messages(history, current);
768 assert_eq!(merged.len(), 4);
769 assert_eq!(merged[3].content.as_text(), "next question");
770 }
771
772 #[test]
773 fn test_merge_history_messages_when_no_overlap_appends_all_current() {
774 let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
775 let current = vec![msg(MessageRole::User, "new question")];
776
777 let merged = super::merge_history_messages(history, current);
778 assert_eq!(merged.len(), 3);
779 assert_eq!(merged[2].content.as_text(), "new question");
780 }
781}