Skip to main content

codex_convert_proxy/proxy/
filters.rs

1//! Pingora ProxyHttp trait implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25impl CodexProxy {
26    /// Convert a buffered Responses-API request body to a Chat-API body.
27    ///
28    /// Returns `Err(ConversionError)` if any of:
29    /// - no provider is configured for the selected backend
30    /// - body cannot be parsed as `ResponseRequest`
31    /// - protocol conversion fails
32    /// - the converted `ChatRequest` cannot be serialised
33    ///
34    /// Callers should propagate the error to the client as a 4xx response
35    /// (Fail-Fast — earlier behaviour silently passed the original
36    /// Responses-API body to a Chat endpoint and let the upstream return 400,
37    /// which obscured proxy bugs).
38    fn try_convert_request_body(
39        &self,
40        ctx: &mut ProxyContext,
41    ) -> Result<Vec<u8>, crate::error::ConversionError> {
42        use crate::error::ConversionError;
43
44        let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
45            ConversionError::ProviderError("no backend selected".to_string())
46        })?;
47        let model_override = backend.model.clone();
48        let provider = self.get_provider(&backend.name).ok_or_else(|| {
49            ConversionError::ProviderError(format!(
50                "no provider registered for backend '{}'",
51                backend.name
52            ))
53        })?;
54
55        let mut response_req: ResponseRequest =
56            serde_json::from_slice(&ctx.buffers.request_body)?;
57        ctx.init_from_response_request(&response_req);
58
59        // Resolve previous-turn history if requested.
60        let mut previous_messages: Option<Vec<ChatMessage>> = None;
61        if let Some(prev_id) = response_req.previous_response_id.clone() {
62            if let Some(snapshot) = self.get_conversation(&prev_id) {
63                if matches!(
64                    &response_req.input,
65                    crate::types::response_api::InputItemOrString::Array(_)
66                ) {
67                    debug!(
68                        "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
69                    );
70                }
71                if response_req.instructions.is_none() {
72                    response_req.instructions = snapshot.instructions.clone();
73                }
74                previous_messages = Some(snapshot.messages);
75            } else {
76                warn!(
77                    "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
78                    prev_id
79                );
80            }
81        }
82
83        let context = ResponseRequestContext::from(&response_req);
84        ctx.set_response_request_context(context);
85
86        let mut chat_req = response_to_chat(
87            response_req,
88            provider.as_ref(),
89            model_override.as_deref(),
90        )?;
91
92        if let Some(history) = previous_messages {
93            chat_req.messages = merge_history_messages(history, chat_req.messages);
94        }
95
96        ctx.follow_up.pending_instructions = chat_req
97            .messages
98            .iter()
99            .find(|m| m.role == MessageRole::System)
100            .map(|m| m.content.as_text());
101        ctx.follow_up.pending_conversation_messages = Some(chat_req.messages.clone());
102
103        serde_json::to_vec(&chat_req).map_err(ConversionError::from)
104    }
105}
106
107fn merge_history_messages(
108    mut history: Vec<ChatMessage>,
109    current_turn_messages: Vec<ChatMessage>,
110) -> Vec<ChatMessage> {
111    // prefer-previous strategy:
112    // history from previous_response_id is authoritative; only append incremental suffix.
113    let mut overlap = 0usize;
114    while overlap < history.len() && overlap < current_turn_messages.len() {
115        let same = serde_json::to_value(&history[overlap]).ok()
116            == serde_json::to_value(&current_turn_messages[overlap]).ok();
117        if !same {
118            break;
119        }
120        overlap += 1;
121    }
122
123    if overlap > 0 {
124        debug!(
125            "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
126            overlap
127        );
128    } else if !current_turn_messages.is_empty() {
129        debug!(
130            "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
131        );
132    }
133
134    history.extend(current_turn_messages.into_iter().skip(overlap));
135    history
136}
137
138#[async_trait]
139impl ProxyHttp for CodexProxy {
140    type CTX = ProxyContext;
141
142    fn new_ctx(&self) -> Self::CTX {
143        ProxyContext::new()
144    }
145
146    /// Request filter - called for each request to select backend and prepare context.
147    async fn request_filter(
148        &self,
149        session: &mut Session,
150        ctx: &mut Self::CTX,
151    ) -> PingoraResult<bool>
152    where
153        Self::CTX: Send + Sync,
154    {
155        let method = session.req_header().method.as_str().to_string();
156        let path = session.req_header().uri.path().to_string();
157
158        // Collect headers for routing
159        let headers: Vec<(String, String)> = session
160            .req_header()
161            .headers
162            .iter()
163            .map(|(name, value)| {
164                (
165                    name.as_str().to_string(),
166                    value.to_str().unwrap_or("<binary>").to_string(),
167                )
168            })
169            .collect();
170
171        // Select backend with config to support path_prefix stripping.
172        let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
173            Some(pair) => pair,
174            None => {
175                warn!("[REQUEST] No matching backend for path: {}", path);
176                return Err(pingora_core::Error::new_str("No matching backend"));
177            }
178        };
179
180        let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
181            let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
182            if stripped.is_empty() {
183                "/".to_string()
184            } else if stripped.starts_with('/') {
185                stripped.to_string()
186            } else {
187                format!("/{}", stripped)
188            }
189        } else {
190            path.clone()
191        };
192        ctx.route.normalized_path = Some(normalized_path.clone());
193
194        // Check if this is a conversion request (Responses API -> Chat API)
195        let is_conversion = (normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses")) && method == "POST";
196        ctx.flags.is_conversion_request = is_conversion;
197
198        if is_conversion {
199            debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
200        }
201
202        ctx.route.selected_backend = Some(backend.clone());
203        ctx.route.provider_name = Some(backend.name.clone());
204
205        debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
206
207        // Return false to continue processing
208        Ok(false)
209    }
210
211    /// Select upstream peer for proxying.
212    async fn upstream_peer(
213        &self,
214        _session: &mut Session,
215        ctx: &mut Self::CTX,
216    ) -> PingoraResult<Box<HttpPeer>> {
217        let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
218            error!("No backend selected");
219            pingora_core::Error::new_str("No backend selected")
220        })?;
221
222        let mut peer = HttpPeer::new(
223            (backend.host.as_str(), backend.port),
224            backend.use_tls,
225            backend.host.clone(),
226        );
227
228        // HTTP/2 priority
229        peer.options.alpn = ALPN::H2H1;
230
231        // Connection configuration
232        peer.options.connection_timeout = Some(Duration::from_secs(10));
233        peer.options.total_connection_timeout = Some(Duration::from_secs(30));
234        peer.options.idle_timeout = Some(Duration::from_secs(90));
235        peer.options.tcp_keepalive = Some(TcpKeepalive {
236            idle: Duration::from_secs(60),
237            interval: Duration::from_secs(5),
238            count: 5,
239        });
240
241        if backend.use_tls {
242            peer.options.h2_ping_interval = Some(Duration::from_secs(30));
243        }
244
245        Ok(Box::new(peer))
246    }
247
248    /// Filter and modify the upstream request.
249    async fn upstream_request_filter(
250        &self,
251        session: &mut Session,
252        upstream_request: &mut RequestHeader,
253        ctx: &mut Self::CTX,
254    ) -> PingoraResult<()> {
255        let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
256            error!("No backend selected");
257            pingora_core::Error::new_str("No backend selected")
258        })?;
259
260        let original_uri = session.req_header().uri.clone();
261        let path = original_uri.path().to_string();
262        let query = original_uri.query();
263        let normalized_path = ctx.route.normalized_path.as_deref().unwrap_or(path.as_str());
264
265        // Check if this is a conversion request (Responses API → Chat API)
266        let is_conversion_request = (normalized_path.starts_with("/v1/responses")
267            || normalized_path.starts_with("/responses"))
268            && upstream_request.method.as_str() == "POST";
269
270        // Get the provider's chat completions path (may differ per provider)
271        // Handle Responses API paths (/v1/responses, /responses) and also Chat API paths (/v1/chat/completions)
272        let chat_api_path = if is_conversion_request || normalized_path.starts_with("/v1/chat/completions") {
273            // Get provider and use its chat_completions_path
274            if let Some(provider) = self.get_provider(&backend.name) {
275                provider.chat_completions_path()
276            } else {
277                // Fallback to standard path
278                "/v1/chat/completions".to_string()
279            }
280        } else {
281            normalized_path.to_string()
282        };
283
284        // Prepend backend's base_path (e.g., /api/coding/paas/v4 for GLM)
285        let new_path = if !backend.base_path.is_empty() {
286            format!("{}{}", backend.base_path, chat_api_path)
287        } else {
288            chat_api_path
289        };
290
291        // Build new URI
292        let new_uri_str = if let Some(q) = query {
293            format!("{}?{}", new_path, q)
294        } else {
295            new_path.clone()
296        };
297
298        let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
299            error!("URI rewrite failed: {}", e);
300            pingora_core::Error::new_str("URI rewrite failed")
301        })?;
302        upstream_request.set_uri(new_uri);
303        ctx.route.rewritten_path = Some(new_path);
304
305        // Remove client authentication headers
306        upstream_request.remove_header("x-api-key");
307        upstream_request.remove_header("authorization");
308
309        // For conversion requests, remove content-length header since body size will change
310        // The upstream will use HTTP/2 DATA frame lengths instead of content-length validation
311        if is_conversion_request {
312            upstream_request.remove_header("content-length");
313            debug!("[UPSTREAM] Removed content-length for body transformation");
314        }
315
316        // Inject API key for this backend
317        upstream_request
318            .insert_header("authorization", format!("Bearer {}", backend.api_key))
319            .map_err(|e| {
320                error!("Failed to inject authorization header: {}", e);
321                pingora_core::Error::new_str("Header injection failed")
322            })?;
323
324        // Set host header
325        upstream_request
326            .insert_header("host", &backend.host)
327            .map_err(|e| {
328                error!("Failed to set host header: {}", e);
329                pingora_core::Error::new_str("Host header failed")
330            })?;
331
332        debug!(
333            "[UPSTREAM] {} {} -> {}",
334            upstream_request.method.as_str(),
335            upstream_request.uri,
336            backend.name
337        );
338
339        Ok(())
340    }
341
342    /// Capture and transform request body (Responses API → Chat API).
343    ///
344    /// For conversion requests, we need to buffer ALL chunks and send the
345    /// converted body at end_of_stream. This is because HTTP/2 DATA frames
346    /// arrive incrementally and we can't convert partial bodies.
347    async fn request_body_filter(
348        &self,
349        _session: &mut Session,
350        body: &mut Option<Bytes>,
351        end_of_stream: bool,
352        ctx: &mut Self::CTX,
353    ) -> PingoraResult<()>
354    where
355        Self::CTX: Send + Sync,
356    {
357        // For conversion requests, buffer all chunks and suppress forwarding
358        // until we have the complete body for conversion.
359        if ctx.flags.is_conversion_request {
360            // Buffer the chunk with size limit check
361            if let Some(b) = body {
362                if ctx.buffers.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
363                    error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
364                    return Err(pingora_core::Error::new_str("Request body too large"));
365                }
366                ctx.buffers.request_body.extend_from_slice(b);
367                debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.buffers.request_body.len());
368            }
369
370            // For conversion requests, suppress forwarding by returning empty body
371            // At end_of_stream, we'll send the converted body
372            *body = Some(Bytes::new());
373
374            // Only process when we have the complete body
375            if end_of_stream {
376                debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.buffers.request_body.len());
377
378                match self.try_convert_request_body(ctx) {
379                    Ok(converted) => {
380                        if self.log_body {
381                            debug!(
382                                "[CONVERTED REQUEST] {}",
383                                String::from_utf8_lossy(&converted)
384                            );
385                        }
386                        let path = self.log_dir.join("converted_request.json");
387                        if std::fs::write(&path, &converted).is_ok() {
388                            debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
389                        }
390                        debug!("[BODY] Sending converted body: {} bytes", converted.len());
391                        *body = Some(Bytes::from(converted));
392                    }
393                    Err(e) => {
394                        error!("[BODY] Conversion failed; aborting upstream: {}", e);
395                        let path = self.log_dir.join("codex_request_body.json");
396                        let _ = std::fs::write(&path, &ctx.buffers.request_body);
397                        return Err(pingora_core::Error::explain(
398                            pingora_core::ErrorType::HTTPStatus(400),
399                            format!("proxy conversion failed: {e}"),
400                        ));
401                    }
402                }
403            }
404
405            return Ok(());
406        }
407
408        // Non-conversion requests: pass through normally
409        if let Some(b) = body {
410            ctx.buffers.request_body.extend_from_slice(b);
411        }
412
413        // Only process when we have the complete body
414        if end_of_stream
415            && !ctx.buffers.request_body.is_empty()
416            && let Ok(json) = serde_json::from_slice::<serde_json::Value>(&ctx.buffers.request_body)
417        {
418            // Non-conversion path: extract just model/stream for diagnostics
419            ctx.init_from_passthrough_json(&json);
420        }
421
422        Ok(())
423    }
424
425    /// Handle upstream response headers.
426    async fn response_filter(
427        &self,
428        _session: &mut Session,
429        upstream_response: &mut ResponseHeader,
430        ctx: &mut Self::CTX,
431    ) -> PingoraResult<()> {
432        let status = upstream_response.status.as_u16();
433        let content_type = upstream_response
434            .headers
435            .get("content-type")
436            .and_then(|v| v.to_str().ok())
437            .unwrap_or("")
438            .to_string();
439        let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
440        let is_success = (200..300).contains(&status);
441
442        ctx.diagnostics.upstream_status = Some(status);
443        ctx.diagnostics.upstream_content_type = Some(content_type.clone());
444        ctx.flags.should_convert_stream_response =
445            ctx.flags.is_streaming && ctx.flags.is_conversion_request && is_success && is_sse;
446
447        if ctx.flags.is_conversion_request {
448            upstream_response.remove_header("content-length");
449            debug!(
450                "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
451                status,
452                content_type
453            );
454        }
455
456        if ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.flags.should_convert_stream_response {
457            warn!(
458                "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
459                status,
460                content_type,
461                if !is_success {
462                    "upstream_non_2xx"
463                } else if !is_sse {
464                    "upstream_not_sse"
465                } else {
466                    "unknown"
467                }
468            );
469        }
470
471        debug!(
472            "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
473            status,
474            ctx.flags.is_streaming,
475            ctx.flags.is_conversion_request,
476            ctx.flags.should_convert_stream_response
477        );
478        Ok(())
479    }
480
481    /// Transform response body (Chat API → Responses API for streaming).
482    fn response_body_filter(
483        &self,
484        _session: &mut Session,
485        body: &mut Option<Bytes>,
486        end_of_body: bool,
487        ctx: &mut Self::CTX,
488    ) -> PingoraResult<Option<Duration>>
489    where
490        Self::CTX: Send + Sync,
491    {
492        // Clone the body bytes for processing
493        let body_clone = body.clone();
494
495        debug!(
496            "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
497            body_clone.as_ref().map(|b| b.len()),
498            end_of_body,
499            ctx.flags.is_streaming,
500            ctx.flags.is_conversion_request
501        );
502
503        if let Some(b) = body_clone.as_ref() {
504            // Check response body size limit for non-streaming conversion only.
505            // For streaming conversion we compact parsed bytes to avoid unbounded growth
506            // and must not switch protocol mid-stream.
507            if !ctx.flags.is_streaming
508                && ctx.flags.is_conversion_request
509                && ctx.buffers.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
510            {
511                warn!(
512                    "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
513                    MAX_RESPONSE_BODY_SIZE
514                );
515            } else {
516                ctx.buffers.response_body.extend_from_slice(b);
517            }
518
519            // Suppress intermediate chunks for non-streaming conversion requests
520            // (the converted body will be sent at end_of_body)
521            if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !end_of_body {
522                *body = Some(Bytes::new());
523            }
524
525            // For streaming conversion responses, convert each SSE chunk
526            if ctx.flags.should_convert_stream_response {
527                // Suppress raw Chat API body immediately - it will be replaced
528                // with converted Responses API events below (or empty if no events)
529                *body = Some(Bytes::new());
530
531                // Get provider clone before mutable borrow to avoid borrow conflict
532                let provider = ctx.route.provider_name.as_ref()
533                    .and_then(|name| self.get_provider(name));
534
535                // Delegate to StreamingResponseHandler for chunk processing
536                let mut handler = StreamingResponseHandler::new(
537                    ctx,
538                    provider,
539                    self.log_body,
540                    self.conversation_store.clone(),
541                );
542
543                // Process streaming frames until end_of_body
544                if let Some(converted) = handler.process_stream_frame() {
545                    *body = Some(Bytes::from(converted));
546                }
547
548                // For streaming responses, append response.completed event at end_of_body
549                if end_of_body {
550                    let completed_events = handler.finalize_stream();
551                    if !completed_events.is_empty() {
552                        let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
553                            .unwrap_or("")
554                            .to_string();
555                        let combined = format!("{}{}", existing, completed_events.join(""));
556                        *body = Some(Bytes::from(combined));
557                    }
558                }
559            }
560        }
561
562        if end_of_body {
563            let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
564
565            // For non-streaming conversion responses, convert the full body.
566            // Fail-Fast: a parse/convert error here means the proxy cannot
567            // honour its protocol contract, so surface an error rather than
568            // silently delivering the raw Chat body (which the client expects
569            // in Responses-API shape).
570            if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.buffers.response_body.is_empty() {
571                let text = std::str::from_utf8(&ctx.buffers.response_body).map_err(|e| {
572                    error!("[RESPONSE_BODY] upstream body not valid UTF-8: {}", e);
573                    pingora_core::Error::explain(
574                        pingora_core::ErrorType::HTTPStatus(502),
575                        format!("upstream response is not valid UTF-8: {e}"),
576                    )
577                })?;
578                let chat_resp: crate::types::chat_api::ChatResponse =
579                    serde_json::from_str(text).map_err(|e| {
580                        error!("[RESPONSE_BODY] failed to parse upstream ChatResponse: {}", e);
581                        pingora_core::Error::explain(
582                            pingora_core::ErrorType::HTTPStatus(502),
583                            format!("upstream response not a valid Chat completion: {e}"),
584                        )
585                    })?;
586                let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
587                let request_context = ctx
588                    .stream_state
589                    .as_ref()
590                    .and_then(|s| s.request_context.as_ref());
591                let response_obj =
592                    crate::convert::chat_to_response_with_context(chat_resp, request_context)
593                        .map_err(|e| {
594                            error!("[RESPONSE_BODY] failed to convert response: {}", e);
595                            pingora_core::Error::explain(
596                                pingora_core::ErrorType::HTTPStatus(500),
597                                format!("proxy response conversion failed: {e}"),
598                            )
599                        })?;
600                let converted = serde_json::to_vec(&response_obj).map_err(|e| {
601                    error!("[RESPONSE_BODY] failed to serialize converted response: {}", e);
602                    pingora_core::Error::explain(
603                        pingora_core::ErrorType::HTTPStatus(500),
604                        format!("proxy response serialization failed: {e}"),
605                    )
606                })?;
607                if self.log_body {
608                    debug!(
609                        "[CONVERTED RESPONSE] {}",
610                        String::from_utf8_lossy(&converted)
611                    );
612                }
613                *body = Some(Bytes::from(converted));
614                if let (Some(mut messages), Some(assistant_message)) = (
615                    ctx.follow_up.pending_conversation_messages.clone(),
616                    assistant_message,
617                ) {
618                    messages.push(assistant_message);
619                    self.store_conversation(
620                        response_obj.id.clone(),
621                        ConversationSnapshot {
622                            instructions: ctx.follow_up.pending_instructions.clone(),
623                            messages,
624                        },
625                    );
626                }
627            }
628
629            info!(
630                "[DONE] provider={}, model={:?}, duration={}ms",
631                ctx.route.provider_name.as_deref().unwrap_or("unknown"),
632                ctx.model.as_ref(),
633                duration_ms
634            );
635        }
636
637        Ok(None)
638    }
639
640    /// Called when connected to upstream.
641    async fn connected_to_upstream(
642        &self,
643        _session: &mut Session,
644        reused: bool,
645        peer: &HttpPeer,
646        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
647        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
648        digest: Option<&Digest>,
649        ctx: &mut Self::CTX,
650    ) -> PingoraResult<()> {
651        let tls_version = digest
652            .and_then(|d| d.ssl_digest.as_ref())
653            .map(|ssl| ssl.version.to_string())
654            .unwrap_or_else(|| "none".to_string());
655
656        let use_tls = ctx.route.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
657        let backend_name = ctx.route.provider_name.as_deref().unwrap_or("unknown");
658
659        info!(
660            "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
661            peer.sni(),
662            peer.address(),
663            backend_name,
664            use_tls,
665            reused,
666            tls_version
667        );
668
669        Ok(())
670    }
671
672    /// Handle proxy errors.
673    fn error_while_proxy(
674        &self,
675        peer: &HttpPeer,
676        _session: &mut Session,
677        e: Box<pingora_core::Error>,
678        ctx: &mut Self::CTX,
679        _client_reused: bool,
680    ) -> Box<pingora_core::Error> {
681        error!(
682            "[ERROR] proxy error to {}: {}",
683            peer.address(),
684            e
685        );
686
687        let mut e = e.more_context(format!("Provider: {}", ctx.route.provider_name.as_deref().unwrap_or("unknown")));
688        e.retry.decide_reuse(false);
689        e
690    }
691
692    /// Handle fatal errors when proxy cannot be established.
693    async fn fail_to_proxy(
694        &self,
695        session: &mut Session,
696        e: &pingora_core::Error,
697        ctx: &mut Self::CTX,
698    ) -> pingora_proxy::FailToProxy
699    where
700        Self::CTX: Send + Sync,
701    {
702        let code = match *e.etype() {
703            pingora_core::ErrorType::ConnectTimedout => 504,
704            pingora_core::ErrorType::ConnectRefused => 502,
705            pingora_core::ErrorType::TLSHandshakeFailure => 502,
706            _ => 502,
707        };
708
709        let method = session.req_header().method.as_str();
710        let uri = &session.req_header().uri;
711
712        error!(
713            "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
714            method,
715            uri,
716            code,
717            ctx.route.provider_name.as_deref().unwrap_or("unknown"),
718            ctx.model.as_ref(),
719            e
720        );
721
722        // Return error response (use serde_json for proper escaping)
723        let error_body = serde_json::json!({
724            "error": {
725                "type": "proxy_error",
726                "code": code,
727                "message": e.to_string()
728            }
729        })
730        .to_string();
731        if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
732            let _ = resp.insert_header("content-type", "application/json");
733            let _ = resp.insert_header("content-length", error_body.len().to_string());
734            let _ = session.write_response_header(Box::new(resp), false).await;
735            let _ = session
736                .write_response_body(Some(bytes::Bytes::from(error_body)), true)
737                .await;
738        }
739
740        pingora_proxy::FailToProxy {
741            error_code: code,
742            can_reuse_downstream: false,
743        }
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use std::collections::HashMap;
750    use std::sync::Arc;
751
752    use crate::config::{BackendConfig, MatchRules};
753    use crate::types::chat_api::{ChatMessage, Content, MessageRole};
754    use crate::providers::GLMProvider;
755    use crate::providers::Provider;
756
757    use super::CodexProxy;
758
759    fn make_test_proxy() -> CodexProxy {
760        let configs = vec![BackendConfig {
761            name: "glm".to_string(),
762            url: "https://api.example.com".to_string(),
763            api_key: "test-key".to_string(),
764            protocol: "openai".to_string(),
765            model: None,
766            match_rules: MatchRules {
767                default: true,
768                ..Default::default()
769            },
770        }];
771        let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
772        let mut providers = HashMap::new();
773        providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
774        CodexProxy::new(router, providers, false, std::path::PathBuf::from("logs"))
775    }
776
777    #[test]
778    fn test_proxy_creation() {
779        let configs = vec![BackendConfig {
780            name: "glm".to_string(),
781            url: "https://api.example.com".to_string(),
782            api_key: "test-key".to_string(),
783            protocol: "openai".to_string(),
784            model: None,
785            match_rules: MatchRules {
786                default: true,
787                ..Default::default()
788            },
789        }];
790        let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
791        let mut providers = HashMap::new();
792        providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
793        let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
794        assert!(proxy.log_body);
795    }
796
797    #[test]
798    fn test_try_convert_request_body_rejects_malformed_json() {
799        // Fail-Fast: a body that doesn't deserialize as ResponseRequest must
800        // produce a ConversionError rather than silently being passed upstream.
801        let proxy = make_test_proxy();
802        let mut ctx = super::ProxyContext::new();
803        ctx.flags.is_conversion_request = true;
804        ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
805        ctx.buffers.request_body = b"{not valid json".to_vec();
806        let err = proxy
807            .try_convert_request_body(&mut ctx)
808            .expect_err("must fail on invalid JSON");
809        assert!(matches!(err, crate::error::ConversionError::JsonError(_)));
810    }
811
812    #[test]
813    fn test_try_convert_request_body_succeeds_on_minimal_request() {
814        let proxy = make_test_proxy();
815        let mut ctx = super::ProxyContext::new();
816        ctx.flags.is_conversion_request = true;
817        ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
818        ctx.buffers.request_body = br#"{"model":"glm-4","input":"hi"}"#.to_vec();
819        let bytes = proxy
820            .try_convert_request_body(&mut ctx)
821            .expect("conversion should succeed");
822        let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
823        assert_eq!(json["model"], "glm-4");
824        assert!(json["messages"].is_array());
825    }
826
827    fn msg(role: MessageRole, text: &str) -> ChatMessage {
828        ChatMessage {
829            role,
830            content: Content::String(text.to_string()),
831            name: None,
832            annotations: None,
833            tool_calls: None,
834            function_call: None,
835            tool_call_id: None,
836            refusal: None,
837        }
838    }
839
840    #[test]
841    fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
842        let history = vec![
843            msg(MessageRole::System, "You are helpful"),
844            msg(MessageRole::User, "hello"),
845            msg(MessageRole::Assistant, "hi"),
846        ];
847        let current = vec![
848            msg(MessageRole::System, "You are helpful"),
849            msg(MessageRole::User, "hello"),
850            msg(MessageRole::Assistant, "hi"),
851            msg(MessageRole::User, "next question"),
852        ];
853
854        let merged = super::merge_history_messages(history, current);
855        assert_eq!(merged.len(), 4);
856        assert_eq!(merged[3].content.as_text(), "next question");
857    }
858
859    #[test]
860    fn test_merge_history_messages_when_no_overlap_appends_all_current() {
861        let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
862        let current = vec![msg(MessageRole::User, "new question")];
863
864        let merged = super::merge_history_messages(history, current);
865        assert_eq!(merged.len(), 3);
866        assert_eq!(merged[2].content.as_text(), "new question");
867    }
868}