Skip to main content

codex_convert_proxy/proxy/
filters.rs

1//! Pingora ProxyHttp trait implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25fn merge_history_messages(
26    mut history: Vec<ChatMessage>,
27    current_turn_messages: Vec<ChatMessage>,
28) -> Vec<ChatMessage> {
29    // prefer-previous strategy:
30    // history from previous_response_id is authoritative; only append incremental suffix.
31    let mut overlap = 0usize;
32    while overlap < history.len() && overlap < current_turn_messages.len() {
33        let same = serde_json::to_value(&history[overlap]).ok()
34            == serde_json::to_value(&current_turn_messages[overlap]).ok();
35        if !same {
36            break;
37        }
38        overlap += 1;
39    }
40
41    if overlap > 0 {
42        debug!(
43            "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
44            overlap
45        );
46    } else if !current_turn_messages.is_empty() {
47        debug!(
48            "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
49        );
50    }
51
52    history.extend(current_turn_messages.into_iter().skip(overlap));
53    history
54}
55
56#[async_trait]
57impl ProxyHttp for CodexProxy {
58    type CTX = ProxyContext;
59
60    fn new_ctx(&self) -> Self::CTX {
61        ProxyContext::new()
62    }
63
64    /// Request filter - called for each request to select backend and prepare context.
65    async fn request_filter(
66        &self,
67        session: &mut Session,
68        ctx: &mut Self::CTX,
69    ) -> PingoraResult<bool>
70    where
71        Self::CTX: Send + Sync,
72    {
73        let method = session.req_header().method.as_str().to_string();
74        let path = session.req_header().uri.path().to_string();
75
76        // Collect headers for routing
77        let headers: Vec<(String, String)> = session
78            .req_header()
79            .headers
80            .iter()
81            .map(|(name, value)| {
82                (
83                    name.as_str().to_string(),
84                    value.to_str().unwrap_or("<binary>").to_string(),
85                )
86            })
87            .collect();
88
89        // Select backend with config to support path_prefix stripping.
90        let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
91            Some(pair) => pair,
92            None => {
93                warn!("[REQUEST] No matching backend for path: {}", path);
94                return Err(pingora_core::Error::new_str("No matching backend"));
95            }
96        };
97
98        let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
99            let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
100            if stripped.is_empty() {
101                "/".to_string()
102            } else if stripped.starts_with('/') {
103                stripped.to_string()
104            } else {
105                format!("/{}", stripped)
106            }
107        } else {
108            path.clone()
109        };
110        ctx.normalized_path = Some(normalized_path.clone());
111
112        // Check if this is a conversion request (Responses API -> Chat API)
113        let is_conversion = normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses");
114        ctx.is_conversion_request = is_conversion;
115
116        if is_conversion {
117            debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
118        }
119
120        ctx.selected_backend = Some(backend.clone());
121        ctx.provider_name = Some(backend.name.clone());
122
123        debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
124
125        // Return false to continue processing
126        Ok(false)
127    }
128
129    /// Select upstream peer for proxying.
130    async fn upstream_peer(
131        &self,
132        _session: &mut Session,
133        ctx: &mut Self::CTX,
134    ) -> PingoraResult<Box<HttpPeer>> {
135        let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
136            error!("No backend selected");
137            pingora_core::Error::new_str("No backend selected")
138        })?;
139
140        let mut peer = HttpPeer::new(
141            (backend.host.as_str(), backend.port),
142            backend.use_tls,
143            backend.host.clone(),
144        );
145
146        // HTTP/2 priority
147        peer.options.alpn = ALPN::H2H1;
148
149        // Connection configuration
150        peer.options.connection_timeout = Some(Duration::from_secs(10));
151        peer.options.total_connection_timeout = Some(Duration::from_secs(30));
152        peer.options.idle_timeout = Some(Duration::from_secs(90));
153        peer.options.tcp_keepalive = Some(TcpKeepalive {
154            idle: Duration::from_secs(60),
155            interval: Duration::from_secs(5),
156            count: 5,
157        });
158
159        if backend.use_tls {
160            peer.options.h2_ping_interval = Some(Duration::from_secs(30));
161        }
162
163        Ok(Box::new(peer))
164    }
165
166    /// Filter and modify the upstream request.
167    async fn upstream_request_filter(
168        &self,
169        session: &mut Session,
170        upstream_request: &mut RequestHeader,
171        ctx: &mut Self::CTX,
172    ) -> PingoraResult<()> {
173        let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
174            error!("No backend selected");
175            pingora_core::Error::new_str("No backend selected")
176        })?;
177
178        let original_uri = session.req_header().uri.clone();
179        let path = original_uri.path().to_string();
180        let query = original_uri.query();
181        let normalized_path = ctx.normalized_path.as_deref().unwrap_or(path.as_str());
182
183        // Check if this is a conversion request (Responses API → Chat API)
184        let is_conversion_request = normalized_path.starts_with("/v1/responses")
185            || normalized_path.starts_with("/responses");
186
187        // Get the provider's chat completions path (may differ per provider)
188        // Handle Responses API paths (/v1/responses, /responses) and also Chat API paths (/v1/chat/completions)
189        let chat_api_path = if normalized_path.starts_with("/v1/responses")
190            || normalized_path.starts_with("/responses")
191            || normalized_path.starts_with("/v1/chat/completions")
192        {
193            // Get provider and use its chat_completions_path
194            if let Some(provider) = self.get_provider(&backend.name) {
195                provider.chat_completions_path()
196            } else {
197                // Fallback to standard path
198                "/v1/chat/completions".to_string()
199            }
200        } else {
201            normalized_path.to_string()
202        };
203
204        // Prepend backend's base_path (e.g., /api/coding/paas/v4 for GLM)
205        let new_path = if !backend.base_path.is_empty() {
206            format!("{}{}", backend.base_path, chat_api_path)
207        } else {
208            chat_api_path
209        };
210
211        // Build new URI
212        let new_uri_str = if let Some(q) = query {
213            format!("{}?{}", new_path, q)
214        } else {
215            new_path.clone()
216        };
217
218        let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
219            error!("URI rewrite failed: {}", e);
220            pingora_core::Error::new_str("URI rewrite failed")
221        })?;
222        upstream_request.set_uri(new_uri);
223        ctx.rewritten_path = Some(new_path);
224
225        // Remove client authentication headers
226        upstream_request.remove_header("x-api-key");
227        upstream_request.remove_header("authorization");
228
229        // For conversion requests, remove content-length header since body size will change
230        // The upstream will use HTTP/2 DATA frame lengths instead of content-length validation
231        if is_conversion_request {
232            upstream_request.remove_header("content-length");
233            debug!("[UPSTREAM] Removed content-length for body transformation");
234        }
235
236        // Inject API key for this backend
237        upstream_request
238            .insert_header("authorization", format!("Bearer {}", backend.api_key))
239            .map_err(|e| {
240                error!("Failed to inject authorization header: {}", e);
241                pingora_core::Error::new_str("Header injection failed")
242            })?;
243
244        // Set host header
245        upstream_request
246            .insert_header("host", &backend.host)
247            .map_err(|e| {
248                error!("Failed to set host header: {}", e);
249                pingora_core::Error::new_str("Host header failed")
250            })?;
251
252        debug!(
253            "[UPSTREAM] {} {} -> {}",
254            upstream_request.method.as_str(),
255            upstream_request.uri,
256            backend.name
257        );
258
259        Ok(())
260    }
261
262    /// Capture and transform request body (Responses API → Chat API).
263    ///
264    /// For conversion requests, we need to buffer ALL chunks and send the
265    /// converted body at end_of_stream. This is because HTTP/2 DATA frames
266    /// arrive incrementally and we can't convert partial bodies.
267    async fn request_body_filter(
268        &self,
269        _session: &mut Session,
270        body: &mut Option<Bytes>,
271        end_of_stream: bool,
272        ctx: &mut Self::CTX,
273    ) -> PingoraResult<()>
274    where
275        Self::CTX: Send + Sync,
276    {
277        // For conversion requests, buffer all chunks and suppress forwarding
278        // until we have the complete body for conversion.
279        if ctx.is_conversion_request {
280            // Buffer the chunk with size limit check
281            if let Some(b) = body {
282                if ctx.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
283                    error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
284                    return Err(pingora_core::Error::new_str("Request body too large"));
285                }
286                ctx.request_body.extend_from_slice(b);
287                debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.request_body.len());
288            }
289
290            // For conversion requests, suppress forwarding by returning empty body
291            // At end_of_stream, we'll send the converted body
292            *body = Some(Bytes::new());
293
294            // Only process when we have the complete body
295            if end_of_stream {
296                debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.request_body.len());
297
298                let backend_name = ctx.selected_backend.as_ref().map(|b| b.name.clone());
299
300                // Parse and convert request
301                if let (Some(b_name), Some(_backend)) = (backend_name.as_ref(), ctx.selected_backend.as_ref())
302                    && let Some(provider) = self.get_provider(b_name) {
303                        // Get model override from backend config (extract value before mutable borrow)
304                        let model_override = ctx.selected_backend.as_ref()
305                            .and_then(|b| b.model.as_deref())
306                            .map(|s| s.to_string());
307                        // Parse as ResponseRequest
308                        match serde_json::from_slice::<ResponseRequest>(&ctx.request_body) {
309                            Ok(mut response_req) => {
310                                let mut previous_messages: Option<Vec<ChatMessage>> = None;
311                                if let Some(prev_id) = response_req.previous_response_id.clone() {
312                                    if let Some(snapshot) = self.get_conversation(&prev_id) {
313                                        if matches!(
314                                            &response_req.input,
315                                            crate::types::response_api::InputItemOrString::Array(_)
316                                        ) {
317                                            debug!(
318                                                "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
319                                            );
320                                        }
321                                        if response_req.instructions.is_none() {
322                                            response_req.instructions = snapshot.instructions.clone();
323                                        }
324                                        previous_messages = Some(snapshot.messages);
325                                    } else {
326                                        warn!(
327                                            "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
328                                            prev_id
329                                        );
330                                    }
331                                }
332                                let context = ResponseRequestContext::from(&response_req);
333                                // Set context before converting (needs mutable ctx)
334                                ctx.set_response_request_context(context);
335                                // Convert using provider (model_override is now an owned String)
336                                match response_to_chat(response_req, provider.as_ref(), model_override.as_deref()) {
337                                    Ok(mut chat_req) => {
338                                        if let Some(history) = previous_messages {
339                                            chat_req.messages = merge_history_messages(history, chat_req.messages);
340                                        }
341                                        ctx.pending_instructions = chat_req
342                                            .messages
343                                            .iter()
344                                            .find(|m| m.role == MessageRole::System)
345                                            .map(|m| m.content.as_text());
346                                        ctx.pending_conversation_messages = Some(chat_req.messages.clone());
347                                        // Serialize ChatRequest as JSON
348                                        match serde_json::to_vec(&chat_req) {
349                                            Ok(converted) => {
350                                                let converted_len = converted.len();
351                                                // Log conversion if enabled
352                                                if self.log_body {
353                                                    debug!("[CONVERTED REQUEST] {}", String::from_utf8_lossy(&converted));
354                                                }
355                                                // Save converted request for debugging to logs directory
356                                                let path = self.log_dir.join("converted_request.json");
357                                                if std::fs::write(&path, &converted).is_ok() {
358                                                    debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
359                                                }
360                                                // Replace body with converted request - this is what will be sent
361                                                *body = Some(Bytes::from(converted));
362                                                debug!("[BODY] Sending converted body: {} bytes", converted_len);
363                                            }
364                                            Err(e) => {
365                                                error!("Failed to serialize ChatRequest: {}", e);
366                                                // Restore original body to let upstream handle the error
367                                                *body = Some(Bytes::from(ctx.request_body.clone()));
368                                            }
369                                        }
370                                    }
371                                    Err(e) => {
372                                        error!("Failed to convert request: {}", e);
373                                        // Restore original body to let upstream handle the error
374                                        *body = Some(Bytes::from(ctx.request_body.clone()));
375                                    }
376                                }
377                            }
378                            Err(e) => {
379                                // If parsing fails, keep original body
380                                debug!("Failed to parse as ResponseRequest, keeping original: {}", e);
381                                // Restore original body to let upstream handle the error
382                                *body = Some(Bytes::from(ctx.request_body.clone()));
383                                // Save full request body to file for debugging
384                                let path = self.log_dir.join("codex_request_body.json");
385                                if std::fs::write(&path, &ctx.request_body).is_ok() {
386                                    debug!("[REQUEST BODY SAVED] to {}", path.display());
387                                }
388                            }
389                        }
390                    }
391
392                // Parse model name from original request for logging
393                ctx.init_from_request_body();
394            }
395
396            return Ok(());
397        }
398
399        // Non-conversion requests: pass through normally
400        if let Some(b) = body {
401            ctx.request_body.extend_from_slice(b);
402        }
403
404        // Only process when we have the complete body
405        if end_of_stream && !ctx.request_body.is_empty() {
406            // Parse model name from original request for logging
407            ctx.init_from_request_body();
408        }
409
410        Ok(())
411    }
412
413    /// Handle upstream response headers.
414    async fn response_filter(
415        &self,
416        _session: &mut Session,
417        upstream_response: &mut ResponseHeader,
418        ctx: &mut Self::CTX,
419    ) -> PingoraResult<()> {
420        let status = upstream_response.status.as_u16();
421        let content_type = upstream_response
422            .headers
423            .get("content-type")
424            .and_then(|v| v.to_str().ok())
425            .unwrap_or("")
426            .to_string();
427        let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
428        let is_success = (200..300).contains(&status);
429
430        ctx.upstream_status = Some(status);
431        ctx.upstream_content_type = Some(content_type.clone());
432        ctx.should_convert_stream_response =
433            ctx.is_streaming && ctx.is_conversion_request && is_success && is_sse;
434
435        if ctx.is_conversion_request {
436            upstream_response.remove_header("content-length");
437            debug!(
438                "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
439                status,
440                content_type
441            );
442        }
443
444        if ctx.is_streaming && ctx.is_conversion_request && !ctx.should_convert_stream_response {
445            warn!(
446                "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
447                status,
448                content_type,
449                if !is_success {
450                    "upstream_non_2xx"
451                } else if !is_sse {
452                    "upstream_not_sse"
453                } else {
454                    "unknown"
455                }
456            );
457        }
458
459        debug!(
460            "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
461            status,
462            ctx.is_streaming,
463            ctx.is_conversion_request,
464            ctx.should_convert_stream_response
465        );
466        Ok(())
467    }
468
469    /// Transform response body (Chat API → Responses API for streaming).
470    fn response_body_filter(
471        &self,
472        _session: &mut Session,
473        body: &mut Option<Bytes>,
474        end_of_body: bool,
475        ctx: &mut Self::CTX,
476    ) -> PingoraResult<Option<Duration>>
477    where
478        Self::CTX: Send + Sync,
479    {
480        // Clone the body bytes for processing
481        let body_clone = body.clone();
482
483        debug!(
484            "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
485            body_clone.as_ref().map(|b| b.len()),
486            end_of_body,
487            ctx.is_streaming,
488            ctx.is_conversion_request
489        );
490
491        if let Some(b) = body_clone.as_ref() {
492            // Check response body size limit for non-streaming conversion only.
493            // For streaming conversion we compact parsed bytes to avoid unbounded growth
494            // and must not switch protocol mid-stream.
495            if !ctx.is_streaming
496                && ctx.is_conversion_request
497                && ctx.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
498            {
499                warn!(
500                    "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
501                    MAX_RESPONSE_BODY_SIZE
502                );
503            } else {
504                ctx.response_body.extend_from_slice(b);
505            }
506
507            // Suppress intermediate chunks for non-streaming conversion requests
508            // (the converted body will be sent at end_of_body)
509            if !ctx.is_streaming && ctx.is_conversion_request && !end_of_body {
510                *body = Some(Bytes::new());
511            }
512
513            // For streaming conversion responses, convert each SSE chunk
514            if ctx.should_convert_stream_response {
515                // Suppress raw Chat API body immediately - it will be replaced
516                // with converted Responses API events below (or empty if no events)
517                *body = Some(Bytes::new());
518
519                // Get provider clone before mutable borrow to avoid borrow conflict
520                let provider = ctx.provider_name.as_ref()
521                    .and_then(|name| self.get_provider(name));
522
523                // Delegate to StreamingResponseHandler for chunk processing
524                let mut handler = StreamingResponseHandler::new(
525                    ctx,
526                    provider,
527                    self.log_body,
528                    self.conversation_store.clone(),
529                );
530
531                // Process streaming frames until end_of_body
532                if let Some(converted) = handler.process_stream_frame() {
533                    *body = Some(Bytes::from(converted));
534                }
535
536                // For streaming responses, append response.completed event at end_of_body
537                if end_of_body {
538                    let completed_events = handler.finalize_stream();
539                    if !completed_events.is_empty() {
540                        let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
541                            .unwrap_or("")
542                            .to_string();
543                        let combined = format!("{}{}", existing, completed_events.join(""));
544                        *body = Some(Bytes::from(combined));
545                    }
546                }
547            }
548        }
549
550        if end_of_body {
551            let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
552
553            // For non-streaming conversion responses, convert the full body
554            if !ctx.is_streaming && ctx.is_conversion_request && !ctx.response_body.is_empty()
555                && let Ok(text) = std::str::from_utf8(&ctx.response_body)
556                    && let Ok(chat_resp) = serde_json::from_str::<crate::types::chat_api::ChatResponse>(text) {
557                        let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
558                        // Get context from stream_state (set during request_body_filter)
559                        let request_context = ctx.stream_state.as_ref()
560                            .and_then(|s| s.request_context.as_ref());
561                        match crate::convert::chat_to_response_with_context(
562                            chat_resp,
563                            request_context,
564                        ) {
565                            Ok(response_obj) => {
566                                if let Ok(converted) = serde_json::to_vec(&response_obj) {
567                                    if self.log_body {
568                                        debug!("[CONVERTED RESPONSE] {}", String::from_utf8_lossy(&converted));
569                                    }
570                                    *body = Some(Bytes::from(converted));
571                                }
572                                if let (Some(mut messages), Some(assistant_message)) = (
573                                    ctx.pending_conversation_messages.clone(),
574                                    assistant_message,
575                                ) {
576                                    messages.push(assistant_message);
577                                    self.store_conversation(
578                                        response_obj.id.clone(),
579                                        ConversationSnapshot {
580                                            instructions: ctx.pending_instructions.clone(),
581                                            messages,
582                                        },
583                                    );
584                                }
585                            }
586                            Err(e) => {
587                                error!("Failed to convert response: {}", e);
588                            }
589                        }
590                    }
591
592            info!(
593                "[DONE] provider={}, model={:?}, duration={}ms",
594                ctx.provider_name.as_deref().unwrap_or("unknown"),
595                ctx.model.as_ref(),
596                duration_ms
597            );
598        }
599
600        Ok(None)
601    }
602
603    /// Called when connected to upstream.
604    async fn connected_to_upstream(
605        &self,
606        _session: &mut Session,
607        reused: bool,
608        peer: &HttpPeer,
609        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
610        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
611        digest: Option<&Digest>,
612        ctx: &mut Self::CTX,
613    ) -> PingoraResult<()> {
614        let tls_version = digest
615            .and_then(|d| d.ssl_digest.as_ref())
616            .map(|ssl| ssl.version.to_string())
617            .unwrap_or_else(|| "none".to_string());
618
619        let use_tls = ctx.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
620        let backend_name = ctx.provider_name.as_deref().unwrap_or("unknown");
621
622        info!(
623            "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
624            peer.sni(),
625            peer.address(),
626            backend_name,
627            use_tls,
628            reused,
629            tls_version
630        );
631
632        Ok(())
633    }
634
635    /// Handle proxy errors.
636    fn error_while_proxy(
637        &self,
638        peer: &HttpPeer,
639        _session: &mut Session,
640        e: Box<pingora_core::Error>,
641        ctx: &mut Self::CTX,
642        _client_reused: bool,
643    ) -> Box<pingora_core::Error> {
644        error!(
645            "[ERROR] proxy error to {}: {}",
646            peer.address(),
647            e
648        );
649
650        let mut e = e.more_context(format!("Provider: {}", ctx.provider_name.as_deref().unwrap_or("unknown")));
651        e.retry.decide_reuse(false);
652        e
653    }
654
655    /// Handle fatal errors when proxy cannot be established.
656    async fn fail_to_proxy(
657        &self,
658        session: &mut Session,
659        e: &pingora_core::Error,
660        ctx: &mut Self::CTX,
661    ) -> pingora_proxy::FailToProxy
662    where
663        Self::CTX: Send + Sync,
664    {
665        let code = match *e.etype() {
666            pingora_core::ErrorType::ConnectTimedout => 504,
667            pingora_core::ErrorType::ConnectRefused => 502,
668            pingora_core::ErrorType::TLSHandshakeFailure => 502,
669            _ => 502,
670        };
671
672        let method = session.req_header().method.as_str();
673        let uri = &session.req_header().uri;
674
675        error!(
676            "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
677            method,
678            uri,
679            code,
680            ctx.provider_name.as_deref().unwrap_or("unknown"),
681            ctx.model.as_ref(),
682            e
683        );
684
685        // Return error response (use serde_json for proper escaping)
686        let error_body = serde_json::json!({
687            "error": {
688                "type": "proxy_error",
689                "code": code,
690                "message": e.to_string()
691            }
692        })
693        .to_string();
694        if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
695            let _ = resp.insert_header("content-type", "application/json");
696            let _ = resp.insert_header("content-length", error_body.len().to_string());
697            let _ = session.write_response_header(Box::new(resp), false).await;
698            let _ = session
699                .write_response_body(Some(bytes::Bytes::from(error_body)), true)
700                .await;
701        }
702
703        pingora_proxy::FailToProxy {
704            error_code: code,
705            can_reuse_downstream: false,
706        }
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use std::collections::HashMap;
713    use std::sync::Arc;
714
715    use crate::config::{BackendConfig, MatchRules};
716    use crate::types::chat_api::{ChatMessage, Content, MessageRole};
717    use crate::providers::GLMProvider;
718    use crate::providers::Provider;
719
720    use super::CodexProxy;
721
722    #[test]
723    fn test_proxy_creation() {
724        let configs = vec![BackendConfig {
725            name: "glm".to_string(),
726            url: "https://api.example.com".to_string(),
727            api_key: "test-key".to_string(),
728            protocol: "openai".to_string(),
729            model: None,
730            match_rules: MatchRules {
731                default: true,
732                ..Default::default()
733            },
734        }];
735        let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
736        let mut providers = HashMap::new();
737        providers.insert("glm".to_string(), Box::new(GLMProvider) as Box<dyn Provider + Send + Sync>);
738        let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
739        assert!(proxy.log_body);
740    }
741
742    fn msg(role: MessageRole, text: &str) -> ChatMessage {
743        ChatMessage {
744            role,
745            content: Content::String(text.to_string()),
746            name: None,
747            annotations: None,
748            tool_calls: None,
749            function_call: None,
750            tool_call_id: None,
751            refusal: None,
752        }
753    }
754
755    #[test]
756    fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
757        let history = vec![
758            msg(MessageRole::System, "You are helpful"),
759            msg(MessageRole::User, "hello"),
760            msg(MessageRole::Assistant, "hi"),
761        ];
762        let current = vec![
763            msg(MessageRole::System, "You are helpful"),
764            msg(MessageRole::User, "hello"),
765            msg(MessageRole::Assistant, "hi"),
766            msg(MessageRole::User, "next question"),
767        ];
768
769        let merged = super::merge_history_messages(history, current);
770        assert_eq!(merged.len(), 4);
771        assert_eq!(merged[3].content.as_text(), "next question");
772    }
773
774    #[test]
775    fn test_merge_history_messages_when_no_overlap_appends_all_current() {
776        let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
777        let current = vec![msg(MessageRole::User, "new question")];
778
779        let merged = super::merge_history_messages(history, current);
780        assert_eq!(merged.len(), 3);
781        assert_eq!(merged[2].content.as_text(), "new question");
782    }
783}