Skip to main content

codex_convert_proxy/proxy/
filters.rs

1//! Pingora ProxyHttp trait implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat, ToolPriority};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25impl CodexProxy {
26    /// Convert a buffered Responses-API request body to a Chat-API body.
27    ///
28    /// Returns `Err(ConversionError)` if any of:
29    /// - no provider is configured for the selected backend
30    /// - body cannot be parsed as `ResponseRequest`
31    /// - protocol conversion fails
32    /// - the converted `ChatRequest` cannot be serialised
33    ///
34    /// Callers should propagate the error to the client as a 4xx response
35    /// (Fail-Fast — earlier behaviour silently passed the original
36    /// Responses-API body to a Chat endpoint and let the upstream return 400,
37    /// which obscured proxy bugs).
38    fn try_convert_request_body(
39        &self,
40        ctx: &mut ProxyContext,
41    ) -> Result<Vec<u8>, crate::error::ConversionError> {
42        use crate::error::ConversionError;
43
44        let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
45            ConversionError::ProviderError("no backend selected".to_string())
46        })?;
47        let model_override = backend.model.clone();
48        let provider = self.get_provider(&backend.name).ok_or_else(|| {
49            ConversionError::ProviderError(format!(
50                "no provider registered for backend '{}'",
51                backend.name
52            ))
53        })?;
54
55        let mut response_req: ResponseRequest =
56            serde_json::from_slice(&ctx.buffers.request_body)?;
57        ctx.init_from_response_request(&response_req);
58
59        // Resolve previous-turn history if requested.
60        let mut previous_messages: Option<Vec<ChatMessage>> = None;
61        if let Some(prev_id) = response_req.previous_response_id.clone() {
62            if let Some(snapshot) = self.get_conversation(&prev_id) {
63                if matches!(
64                    &response_req.input,
65                    crate::types::response_api::InputItemOrString::Array(_)
66                ) {
67                    debug!(
68                        "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
69                    );
70                }
71                if response_req.instructions.is_none() {
72                    response_req.instructions = snapshot.instructions.clone();
73                }
74                previous_messages = Some(snapshot.messages);
75            } else {
76                warn!(
77                    "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
78                    prev_id
79                );
80            }
81        }
82
83        let context = ResponseRequestContext::from(&response_req);
84        ctx.set_response_request_context(context);
85
86        let mut chat_req = response_to_chat(
87            response_req,
88            provider.as_ref(),
89            model_override.as_deref(),
90            ToolPriority::Merge,
91        )?;
92
93        if let Some(history) = previous_messages {
94            chat_req.messages = merge_history_messages(history, chat_req.messages);
95        }
96
97        ctx.follow_up.pending_instructions = chat_req
98            .messages
99            .iter()
100            .find(|m| m.role == MessageRole::System)
101            .map(|m| m.content.as_text());
102        ctx.follow_up.pending_conversation_messages = Some(chat_req.messages.clone());
103
104        serde_json::to_vec(&chat_req).map_err(ConversionError::from)
105    }
106}
107
108fn merge_history_messages(
109    mut history: Vec<ChatMessage>,
110    current_turn_messages: Vec<ChatMessage>,
111) -> Vec<ChatMessage> {
112    // prefer-previous strategy:
113    // history from previous_response_id is authoritative; only append incremental suffix.
114    let mut overlap = 0usize;
115    while overlap < history.len() && overlap < current_turn_messages.len() {
116        let same = serde_json::to_value(&history[overlap]).ok()
117            == serde_json::to_value(&current_turn_messages[overlap]).ok();
118        if !same {
119            break;
120        }
121        overlap += 1;
122    }
123
124    if overlap > 0 {
125        debug!(
126            "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
127            overlap
128        );
129    } else if !current_turn_messages.is_empty() {
130        debug!(
131            "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
132        );
133    }
134
135    history.extend(current_turn_messages.into_iter().skip(overlap));
136    history
137}
138
139#[async_trait]
140impl ProxyHttp for CodexProxy {
141    type CTX = ProxyContext;
142
143    fn new_ctx(&self) -> Self::CTX {
144        ProxyContext::new()
145    }
146
147    /// Request filter - called for each request to select backend and prepare context.
148    async fn request_filter(
149        &self,
150        session: &mut Session,
151        ctx: &mut Self::CTX,
152    ) -> PingoraResult<bool>
153    where
154        Self::CTX: Send + Sync,
155    {
156        let method = session.req_header().method.as_str().to_string();
157        let path = session.req_header().uri.path().to_string();
158
159        // Collect headers for routing
160        let headers: Vec<(String, String)> = session
161            .req_header()
162            .headers
163            .iter()
164            .map(|(name, value)| {
165                (
166                    name.as_str().to_string(),
167                    value.to_str().unwrap_or("<binary>").to_string(),
168                )
169            })
170            .collect();
171
172        // Select backend with config to support path_prefix stripping.
173        let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
174            Some(pair) => pair,
175            None => {
176                warn!("[REQUEST] No matching backend for path: {}", path);
177                return Err(pingora_core::Error::new_str("No matching backend"));
178            }
179        };
180
181        let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
182            let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
183            if stripped.is_empty() {
184                "/".to_string()
185            } else if stripped.starts_with('/') {
186                stripped.to_string()
187            } else {
188                format!("/{}", stripped)
189            }
190        } else {
191            path.clone()
192        };
193        ctx.route.normalized_path = Some(normalized_path.clone());
194
195        // Check if this is a conversion request (Responses API -> Chat API)
196        let is_conversion = (normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses")) && method == "POST";
197        ctx.flags.is_conversion_request = is_conversion;
198
199        if is_conversion {
200            debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
201        }
202
203        ctx.route.selected_backend = Some(backend.clone());
204        ctx.route.provider_name = Some(backend.name.clone());
205
206        debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
207
208        // Return false to continue processing
209        Ok(false)
210    }
211
212    /// Select upstream peer for proxying.
213    async fn upstream_peer(
214        &self,
215        _session: &mut Session,
216        ctx: &mut Self::CTX,
217    ) -> PingoraResult<Box<HttpPeer>> {
218        let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
219            error!("No backend selected");
220            pingora_core::Error::new_str("No backend selected")
221        })?;
222
223        let mut peer = HttpPeer::new(
224            (backend.host.as_str(), backend.port),
225            backend.use_tls,
226            backend.host.clone(),
227        );
228
229        // HTTP/2 priority
230        peer.options.alpn = ALPN::H2H1;
231
232        // Connection configuration
233        peer.options.connection_timeout = Some(Duration::from_secs(10));
234        peer.options.total_connection_timeout = Some(Duration::from_secs(30));
235        peer.options.idle_timeout = Some(Duration::from_secs(90));
236        peer.options.tcp_keepalive = Some(TcpKeepalive {
237            idle: Duration::from_secs(60),
238            interval: Duration::from_secs(5),
239            count: 5,
240        });
241
242        if backend.use_tls {
243            peer.options.h2_ping_interval = Some(Duration::from_secs(30));
244        }
245
246        Ok(Box::new(peer))
247    }
248
249    /// Filter and modify the upstream request.
250    async fn upstream_request_filter(
251        &self,
252        session: &mut Session,
253        upstream_request: &mut RequestHeader,
254        ctx: &mut Self::CTX,
255    ) -> PingoraResult<()> {
256        let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
257            error!("No backend selected");
258            pingora_core::Error::new_str("No backend selected")
259        })?;
260
261        let original_uri = session.req_header().uri.clone();
262        let path = original_uri.path().to_string();
263        let query = original_uri.query();
264        let normalized_path = ctx.route.normalized_path.as_deref().unwrap_or(path.as_str());
265
266        // Check if this is a conversion request (Responses API → Chat API)
267        let is_conversion_request = (normalized_path.starts_with("/v1/responses")
268            || normalized_path.starts_with("/responses"))
269            && upstream_request.method.as_str() == "POST";
270
271        // Get the provider's chat completions path (may differ per provider)
272        // Handle Responses API paths (/v1/responses, /responses) and also Chat API paths (/v1/chat/completions)
273        let chat_api_path = if is_conversion_request || normalized_path.starts_with("/v1/chat/completions") {
274            // Get provider and use its chat_completions_path
275            if let Some(provider) = self.get_provider(&backend.name) {
276                provider.chat_completions_path()
277            } else {
278                // Fallback to standard path
279                "/v1/chat/completions".to_string()
280            }
281        } else {
282            normalized_path.to_string()
283        };
284
285        // Prepend backend's base_path (e.g., /api/coding/paas/v4 for GLM)
286        let new_path = if !backend.base_path.is_empty() {
287            format!("{}{}", backend.base_path, chat_api_path)
288        } else {
289            chat_api_path
290        };
291
292        // Build new URI
293        let new_uri_str = if let Some(q) = query {
294            format!("{}?{}", new_path, q)
295        } else {
296            new_path.clone()
297        };
298
299        let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
300            error!("URI rewrite failed: {}", e);
301            pingora_core::Error::new_str("URI rewrite failed")
302        })?;
303        upstream_request.set_uri(new_uri);
304        ctx.route.rewritten_path = Some(new_path);
305
306        // Remove client authentication headers
307        upstream_request.remove_header("x-api-key");
308        upstream_request.remove_header("authorization");
309
310        // For conversion requests, remove content-length header since body size will change
311        // The upstream will use HTTP/2 DATA frame lengths instead of content-length validation
312        if is_conversion_request {
313            upstream_request.remove_header("content-length");
314            debug!("[UPSTREAM] Removed content-length for body transformation");
315        }
316
317        // Inject API key for this backend
318        upstream_request
319            .insert_header("authorization", format!("Bearer {}", backend.api_key))
320            .map_err(|e| {
321                error!("Failed to inject authorization header: {}", e);
322                pingora_core::Error::new_str("Header injection failed")
323            })?;
324
325        // Set host header
326        upstream_request
327            .insert_header("host", &backend.host)
328            .map_err(|e| {
329                error!("Failed to set host header: {}", e);
330                pingora_core::Error::new_str("Host header failed")
331            })?;
332
333        debug!(
334            "[UPSTREAM] {} {} -> {}",
335            upstream_request.method.as_str(),
336            upstream_request.uri,
337            backend.name
338        );
339
340        Ok(())
341    }
342
343    /// Capture and transform request body (Responses API → Chat API).
344    ///
345    /// For conversion requests, we need to buffer ALL chunks and send the
346    /// converted body at end_of_stream. This is because HTTP/2 DATA frames
347    /// arrive incrementally and we can't convert partial bodies.
348    async fn request_body_filter(
349        &self,
350        _session: &mut Session,
351        body: &mut Option<Bytes>,
352        end_of_stream: bool,
353        ctx: &mut Self::CTX,
354    ) -> PingoraResult<()>
355    where
356        Self::CTX: Send + Sync,
357    {
358        // For conversion requests, buffer all chunks and suppress forwarding
359        // until we have the complete body for conversion.
360        if ctx.flags.is_conversion_request {
361            // Buffer the chunk with size limit check
362            if let Some(b) = body {
363                if ctx.buffers.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
364                    error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
365                    return Err(pingora_core::Error::new_str("Request body too large"));
366                }
367                ctx.buffers.request_body.extend_from_slice(b);
368                debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.buffers.request_body.len());
369            }
370
371            // For conversion requests, suppress forwarding by returning empty body
372            // At end_of_stream, we'll send the converted body
373            *body = Some(Bytes::new());
374
375            // Only process when we have the complete body
376            if end_of_stream {
377                debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.buffers.request_body.len());
378
379                match self.try_convert_request_body(ctx) {
380                    Ok(converted) => {
381                        if self.log_body {
382                            debug!(
383                                "[CONVERTED REQUEST] {}",
384                                String::from_utf8_lossy(&converted)
385                            );
386                        }
387                        let path = self.log_dir.join("converted_request.json");
388                        if std::fs::write(&path, &converted).is_ok() {
389                            debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
390                        }
391                        debug!("[BODY] Sending converted body: {} bytes", converted.len());
392                        *body = Some(Bytes::from(converted));
393                    }
394                    Err(e) => {
395                        error!("[BODY] Conversion failed; aborting upstream: {}", e);
396                        let path = self.log_dir.join("codex_request_body.json");
397                        let _ = std::fs::write(&path, &ctx.buffers.request_body);
398                        return Err(pingora_core::Error::explain(
399                            pingora_core::ErrorType::HTTPStatus(400),
400                            format!("proxy conversion failed: {e}"),
401                        ));
402                    }
403                }
404            }
405
406            return Ok(());
407        }
408
409        // Non-conversion requests: pass through normally
410        if let Some(b) = body {
411            ctx.buffers.request_body.extend_from_slice(b);
412        }
413
414        // Only process when we have the complete body
415        if end_of_stream
416            && !ctx.buffers.request_body.is_empty()
417            && let Ok(json) = serde_json::from_slice::<serde_json::Value>(&ctx.buffers.request_body)
418        {
419            // Non-conversion path: extract just model/stream for diagnostics
420            ctx.init_from_passthrough_json(&json);
421        }
422
423        Ok(())
424    }
425
426    /// Handle upstream response headers.
427    async fn response_filter(
428        &self,
429        _session: &mut Session,
430        upstream_response: &mut ResponseHeader,
431        ctx: &mut Self::CTX,
432    ) -> PingoraResult<()> {
433        let status = upstream_response.status.as_u16();
434        let content_type = upstream_response
435            .headers
436            .get("content-type")
437            .and_then(|v| v.to_str().ok())
438            .unwrap_or("")
439            .to_string();
440        let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
441        let is_success = (200..300).contains(&status);
442
443        ctx.diagnostics.upstream_status = Some(status);
444        ctx.diagnostics.upstream_content_type = Some(content_type.clone());
445        ctx.flags.should_convert_stream_response =
446            ctx.flags.is_streaming && ctx.flags.is_conversion_request && is_success && is_sse;
447
448        if ctx.flags.is_conversion_request {
449            upstream_response.remove_header("content-length");
450            debug!(
451                "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
452                status,
453                content_type
454            );
455        }
456
457        if ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.flags.should_convert_stream_response {
458            warn!(
459                "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
460                status,
461                content_type,
462                if !is_success {
463                    "upstream_non_2xx"
464                } else if !is_sse {
465                    "upstream_not_sse"
466                } else {
467                    "unknown"
468                }
469            );
470        }
471
472        debug!(
473            "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
474            status,
475            ctx.flags.is_streaming,
476            ctx.flags.is_conversion_request,
477            ctx.flags.should_convert_stream_response
478        );
479        Ok(())
480    }
481
482    /// Transform response body (Chat API → Responses API for streaming).
483    fn response_body_filter(
484        &self,
485        _session: &mut Session,
486        body: &mut Option<Bytes>,
487        end_of_body: bool,
488        ctx: &mut Self::CTX,
489    ) -> PingoraResult<Option<Duration>>
490    where
491        Self::CTX: Send + Sync,
492    {
493        // Clone the body bytes for processing
494        let body_clone = body.clone();
495
496        debug!(
497            "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
498            body_clone.as_ref().map(|b| b.len()),
499            end_of_body,
500            ctx.flags.is_streaming,
501            ctx.flags.is_conversion_request
502        );
503
504        if let Some(b) = body_clone.as_ref() {
505            // Check response body size limit for non-streaming conversion only.
506            // For streaming conversion we compact parsed bytes to avoid unbounded growth
507            // and must not switch protocol mid-stream.
508            if !ctx.flags.is_streaming
509                && ctx.flags.is_conversion_request
510                && ctx.buffers.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
511            {
512                warn!(
513                    "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
514                    MAX_RESPONSE_BODY_SIZE
515                );
516            } else {
517                ctx.buffers.response_body.extend_from_slice(b);
518            }
519
520            // Suppress intermediate chunks for non-streaming conversion requests
521            // (the converted body will be sent at end_of_body)
522            if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !end_of_body {
523                *body = Some(Bytes::new());
524            }
525
526            // For streaming conversion responses, convert each SSE chunk
527            if ctx.flags.should_convert_stream_response {
528                // Suppress raw Chat API body immediately - it will be replaced
529                // with converted Responses API events below (or empty if no events)
530                *body = Some(Bytes::new());
531
532                // Get provider clone before mutable borrow to avoid borrow conflict
533                let provider = ctx.route.provider_name.as_ref()
534                    .and_then(|name| self.get_provider(name));
535
536                // Delegate to StreamingResponseHandler for chunk processing
537                let mut handler = StreamingResponseHandler::new(
538                    ctx,
539                    provider,
540                    self.log_body,
541                    self.conversation_store.clone(),
542                );
543
544                // Process streaming frames until end_of_body
545                if let Some(converted) = handler.process_stream_frame() {
546                    *body = Some(Bytes::from(converted));
547                }
548
549                // For streaming responses, append response.completed event at end_of_body
550                if end_of_body {
551                    let completed_events = handler.finalize_stream();
552                    if !completed_events.is_empty() {
553                        let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
554                            .unwrap_or("")
555                            .to_string();
556                        let combined = format!("{}{}", existing, completed_events.join(""));
557                        *body = Some(Bytes::from(combined));
558                    }
559                }
560            }
561        }
562
563        if end_of_body {
564            let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
565
566            // For non-streaming conversion responses, convert the full body.
567            // Fail-Fast: a parse/convert error here means the proxy cannot
568            // honour its protocol contract, so surface an error rather than
569            // silently delivering the raw Chat body (which the client expects
570            // in Responses-API shape).
571            if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.buffers.response_body.is_empty() {
572                let text = std::str::from_utf8(&ctx.buffers.response_body).map_err(|e| {
573                    error!("[RESPONSE_BODY] upstream body not valid UTF-8: {}", e);
574                    pingora_core::Error::explain(
575                        pingora_core::ErrorType::HTTPStatus(502),
576                        format!("upstream response is not valid UTF-8: {e}"),
577                    )
578                })?;
579                let chat_resp: crate::types::chat_api::ChatResponse =
580                    serde_json::from_str(text).map_err(|e| {
581                        error!("[RESPONSE_BODY] failed to parse upstream ChatResponse: {}", e);
582                        pingora_core::Error::explain(
583                            pingora_core::ErrorType::HTTPStatus(502),
584                            format!("upstream response not a valid Chat completion: {e}"),
585                        )
586                    })?;
587                let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
588                let request_context = ctx
589                    .stream_state
590                    .as_ref()
591                    .and_then(|s| s.request_context.as_ref());
592                let response_obj =
593                    crate::convert::chat_to_response_with_context(chat_resp, request_context)
594                        .map_err(|e| {
595                            error!("[RESPONSE_BODY] failed to convert response: {}", e);
596                            pingora_core::Error::explain(
597                                pingora_core::ErrorType::HTTPStatus(500),
598                                format!("proxy response conversion failed: {e}"),
599                            )
600                        })?;
601                let converted = serde_json::to_vec(&response_obj).map_err(|e| {
602                    error!("[RESPONSE_BODY] failed to serialize converted response: {}", e);
603                    pingora_core::Error::explain(
604                        pingora_core::ErrorType::HTTPStatus(500),
605                        format!("proxy response serialization failed: {e}"),
606                    )
607                })?;
608                if self.log_body {
609                    debug!(
610                        "[CONVERTED RESPONSE] {}",
611                        String::from_utf8_lossy(&converted)
612                    );
613                }
614                *body = Some(Bytes::from(converted));
615                if let (Some(mut messages), Some(assistant_message)) = (
616                    ctx.follow_up.pending_conversation_messages.clone(),
617                    assistant_message,
618                ) {
619                    messages.push(assistant_message);
620                    self.store_conversation(
621                        response_obj.id.clone(),
622                        ConversationSnapshot {
623                            instructions: ctx.follow_up.pending_instructions.clone(),
624                            messages,
625                        },
626                    );
627                }
628            }
629
630            info!(
631                "[DONE] provider={}, model={:?}, duration={}ms",
632                ctx.route.provider_name.as_deref().unwrap_or("unknown"),
633                ctx.model.as_ref(),
634                duration_ms
635            );
636        }
637
638        Ok(None)
639    }
640
641    /// Called when connected to upstream.
642    async fn connected_to_upstream(
643        &self,
644        _session: &mut Session,
645        reused: bool,
646        peer: &HttpPeer,
647        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
648        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
649        digest: Option<&Digest>,
650        ctx: &mut Self::CTX,
651    ) -> PingoraResult<()> {
652        let tls_version = digest
653            .and_then(|d| d.ssl_digest.as_ref())
654            .map(|ssl| ssl.version.to_string())
655            .unwrap_or_else(|| "none".to_string());
656
657        let use_tls = ctx.route.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
658        let backend_name = ctx.route.provider_name.as_deref().unwrap_or("unknown");
659
660        info!(
661            "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
662            peer.sni(),
663            peer.address(),
664            backend_name,
665            use_tls,
666            reused,
667            tls_version
668        );
669
670        Ok(())
671    }
672
673    /// Handle proxy errors.
674    fn error_while_proxy(
675        &self,
676        peer: &HttpPeer,
677        _session: &mut Session,
678        e: Box<pingora_core::Error>,
679        ctx: &mut Self::CTX,
680        _client_reused: bool,
681    ) -> Box<pingora_core::Error> {
682        error!(
683            "[ERROR] proxy error to {}: {}",
684            peer.address(),
685            e
686        );
687
688        let mut e = e.more_context(format!("Provider: {}", ctx.route.provider_name.as_deref().unwrap_or("unknown")));
689        e.retry.decide_reuse(false);
690        e
691    }
692
693    /// Handle fatal errors when proxy cannot be established.
694    async fn fail_to_proxy(
695        &self,
696        session: &mut Session,
697        e: &pingora_core::Error,
698        ctx: &mut Self::CTX,
699    ) -> pingora_proxy::FailToProxy
700    where
701        Self::CTX: Send + Sync,
702    {
703        let code = match *e.etype() {
704            pingora_core::ErrorType::ConnectTimedout => 504,
705            pingora_core::ErrorType::ConnectRefused => 502,
706            pingora_core::ErrorType::TLSHandshakeFailure => 502,
707            _ => 502,
708        };
709
710        let method = session.req_header().method.as_str();
711        let uri = &session.req_header().uri;
712
713        error!(
714            "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
715            method,
716            uri,
717            code,
718            ctx.route.provider_name.as_deref().unwrap_or("unknown"),
719            ctx.model.as_ref(),
720            e
721        );
722
723        // Return error response (use serde_json for proper escaping)
724        let error_body = serde_json::json!({
725            "error": {
726                "type": "proxy_error",
727                "code": code,
728                "message": e.to_string()
729            }
730        })
731        .to_string();
732        if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
733            let _ = resp.insert_header("content-type", "application/json");
734            let _ = resp.insert_header("content-length", error_body.len().to_string());
735            let _ = session.write_response_header(Box::new(resp), false).await;
736            let _ = session
737                .write_response_body(Some(bytes::Bytes::from(error_body)), true)
738                .await;
739        }
740
741        pingora_proxy::FailToProxy {
742            error_code: code,
743            can_reuse_downstream: false,
744        }
745    }
746}
747
748#[cfg(test)]
749mod tests {
750    use std::collections::HashMap;
751    use std::sync::Arc;
752
753    use crate::config::{BackendConfig, MatchRules};
754    use crate::types::chat_api::{ChatMessage, Content, MessageRole};
755    use crate::providers::GLMProvider;
756    use crate::providers::Provider;
757
758    use super::CodexProxy;
759
760    fn make_test_proxy() -> CodexProxy {
761        let configs = vec![BackendConfig {
762            name: "glm".to_string(),
763            url: "https://api.example.com".to_string(),
764            api_key: "test-key".to_string(),
765            protocol: "openai".to_string(),
766            model: None,
767            match_rules: MatchRules {
768                default: true,
769                ..Default::default()
770            },
771        }];
772        let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
773        let mut providers = HashMap::new();
774        providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
775        CodexProxy::new(router, providers, false, std::path::PathBuf::from("logs"))
776    }
777
778    #[test]
779    fn test_proxy_creation() {
780        let configs = vec![BackendConfig {
781            name: "glm".to_string(),
782            url: "https://api.example.com".to_string(),
783            api_key: "test-key".to_string(),
784            protocol: "openai".to_string(),
785            model: None,
786            match_rules: MatchRules {
787                default: true,
788                ..Default::default()
789            },
790        }];
791        let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
792        let mut providers = HashMap::new();
793        providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
794        let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
795        assert!(proxy.log_body);
796    }
797
798    #[test]
799    fn test_try_convert_request_body_rejects_malformed_json() {
800        // Fail-Fast: a body that doesn't deserialize as ResponseRequest must
801        // produce a ConversionError rather than silently being passed upstream.
802        let proxy = make_test_proxy();
803        let mut ctx = super::ProxyContext::new();
804        ctx.flags.is_conversion_request = true;
805        ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
806        ctx.buffers.request_body = b"{not valid json".to_vec();
807        let err = proxy
808            .try_convert_request_body(&mut ctx)
809            .expect_err("must fail on invalid JSON");
810        assert!(matches!(err, crate::error::ConversionError::JsonError(_)));
811    }
812
813    #[test]
814    fn test_try_convert_request_body_succeeds_on_minimal_request() {
815        let proxy = make_test_proxy();
816        let mut ctx = super::ProxyContext::new();
817        ctx.flags.is_conversion_request = true;
818        ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
819        ctx.buffers.request_body = br#"{"model":"glm-4","input":"hi"}"#.to_vec();
820        let bytes = proxy
821            .try_convert_request_body(&mut ctx)
822            .expect("conversion should succeed");
823        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
824        assert_eq!(json["model"], "glm-4");
825        assert!(json["messages"].is_array());
826    }
827
828    fn msg(role: MessageRole, text: &str) -> ChatMessage {
829        ChatMessage {
830            role,
831            content: Content::String(text.to_string()),
832            name: None,
833            annotations: None,
834            tool_calls: None,
835            function_call: None,
836            tool_call_id: None,
837            refusal: None,
838        }
839    }
840
841    #[test]
842    fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
843        let history = vec![
844            msg(MessageRole::System, "You are helpful"),
845            msg(MessageRole::User, "hello"),
846            msg(MessageRole::Assistant, "hi"),
847        ];
848        let current = vec![
849            msg(MessageRole::System, "You are helpful"),
850            msg(MessageRole::User, "hello"),
851            msg(MessageRole::Assistant, "hi"),
852            msg(MessageRole::User, "next question"),
853        ];
854
855        let merged = super::merge_history_messages(history, current);
856        assert_eq!(merged.len(), 4);
857        assert_eq!(merged[3].content.as_text(), "next question");
858    }
859
860    #[test]
861    fn test_merge_history_messages_when_no_overlap_appends_all_current() {
862        let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
863        let current = vec![msg(MessageRole::User, "new question")];
864
865        let merged = super::merge_history_messages(history, current);
866        assert_eq!(merged.len(), 3);
867        assert_eq!(merged[2].content.as_text(), "new question");
868    }
869}