Skip to main content

codex_convert_proxy/proxy/
filters.rs

1//! Pingora ProxyHttp trait implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25fn merge_history_messages(
26    mut history: Vec<ChatMessage>,
27    current_turn_messages: Vec<ChatMessage>,
28) -> Vec<ChatMessage> {
29    // prefer-previous strategy:
30    // history from previous_response_id is authoritative; only append incremental suffix.
31    let mut overlap = 0usize;
32    while overlap < history.len() && overlap < current_turn_messages.len() {
33        let same = serde_json::to_value(&history[overlap]).ok()
34            == serde_json::to_value(&current_turn_messages[overlap]).ok();
35        if !same {
36            break;
37        }
38        overlap += 1;
39    }
40
41    if overlap > 0 {
42        debug!(
43            "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
44            overlap
45        );
46    } else if !current_turn_messages.is_empty() {
47        debug!(
48            "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
49        );
50    }
51
52    history.extend(current_turn_messages.into_iter().skip(overlap));
53    history
54}
55
56#[async_trait]
57impl ProxyHttp for CodexProxy {
58    type CTX = ProxyContext;
59
60    fn new_ctx(&self) -> Self::CTX {
61        ProxyContext::new()
62    }
63
64    /// Request filter - called for each request to select backend and prepare context.
65    async fn request_filter(
66        &self,
67        session: &mut Session,
68        ctx: &mut Self::CTX,
69    ) -> PingoraResult<bool>
70    where
71        Self::CTX: Send + Sync,
72    {
73        let method = session.req_header().method.as_str().to_string();
74        let path = session.req_header().uri.path().to_string();
75
76        // Collect headers for routing
77        let headers: Vec<(String, String)> = session
78            .req_header()
79            .headers
80            .iter()
81            .map(|(name, value)| {
82                (
83                    name.as_str().to_string(),
84                    value.to_str().unwrap_or("<binary>").to_string(),
85                )
86            })
87            .collect();
88
89        // Select backend with config to support path_prefix stripping.
90        let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
91            Some(pair) => pair,
92            None => {
93                warn!("[REQUEST] No matching backend for path: {}", path);
94                return Err(pingora_core::Error::new_str("No matching backend"));
95            }
96        };
97
98        let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
99            let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
100            if stripped.is_empty() {
101                "/".to_string()
102            } else if stripped.starts_with('/') {
103                stripped.to_string()
104            } else {
105                format!("/{}", stripped)
106            }
107        } else {
108            path.clone()
109        };
110        ctx.normalized_path = Some(normalized_path.clone());
111
112        // Check if this is a conversion request (Responses API -> Chat API)
113        let is_conversion = (normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses")) && method == "POST";
114        ctx.is_conversion_request = is_conversion;
115
116        if is_conversion {
117            debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
118        }
119
120        ctx.selected_backend = Some(backend.clone());
121        ctx.provider_name = Some(backend.name.clone());
122
123        debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
124
125        // Return false to continue processing
126        Ok(false)
127    }
128
129    /// Select upstream peer for proxying.
130    async fn upstream_peer(
131        &self,
132        _session: &mut Session,
133        ctx: &mut Self::CTX,
134    ) -> PingoraResult<Box<HttpPeer>> {
135        let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
136            error!("No backend selected");
137            pingora_core::Error::new_str("No backend selected")
138        })?;
139
140        let mut peer = HttpPeer::new(
141            (backend.host.as_str(), backend.port),
142            backend.use_tls,
143            backend.host.clone(),
144        );
145
146        // HTTP/2 priority
147        peer.options.alpn = ALPN::H2H1;
148
149        // Connection configuration
150        peer.options.connection_timeout = Some(Duration::from_secs(10));
151        peer.options.total_connection_timeout = Some(Duration::from_secs(30));
152        peer.options.idle_timeout = Some(Duration::from_secs(90));
153        peer.options.tcp_keepalive = Some(TcpKeepalive {
154            idle: Duration::from_secs(60),
155            interval: Duration::from_secs(5),
156            count: 5,
157        });
158
159        if backend.use_tls {
160            peer.options.h2_ping_interval = Some(Duration::from_secs(30));
161        }
162
163        Ok(Box::new(peer))
164    }
165
166    /// Filter and modify the upstream request.
167    async fn upstream_request_filter(
168        &self,
169        session: &mut Session,
170        upstream_request: &mut RequestHeader,
171        ctx: &mut Self::CTX,
172    ) -> PingoraResult<()> {
173        let backend = ctx.selected_backend.as_ref().ok_or_else(|| {
174            error!("No backend selected");
175            pingora_core::Error::new_str("No backend selected")
176        })?;
177
178        let original_uri = session.req_header().uri.clone();
179        let path = original_uri.path().to_string();
180        let query = original_uri.query();
181        let normalized_path = ctx.normalized_path.as_deref().unwrap_or(path.as_str());
182
183        // Check if this is a conversion request (Responses API → Chat API)
184        let is_conversion_request = (normalized_path.starts_with("/v1/responses")
185            || normalized_path.starts_with("/responses"))
186            && upstream_request.method.as_str() == "POST";
187
188        // Get the provider's chat completions path (may differ per provider)
189        // Handle Responses API paths (/v1/responses, /responses) and also Chat API paths (/v1/chat/completions)
190        let chat_api_path = if is_conversion_request || normalized_path.starts_with("/v1/chat/completions") {
191            // Get provider and use its chat_completions_path
192            if let Some(provider) = self.get_provider(&backend.name) {
193                provider.chat_completions_path()
194            } else {
195                // Fallback to standard path
196                "/v1/chat/completions".to_string()
197            }
198        } else {
199            normalized_path.to_string()
200        };
201
202        // Prepend backend's base_path (e.g., /api/coding/paas/v4 for GLM)
203        let new_path = if !backend.base_path.is_empty() {
204            format!("{}{}", backend.base_path, chat_api_path)
205        } else {
206            chat_api_path
207        };
208
209        // Build new URI
210        let new_uri_str = if let Some(q) = query {
211            format!("{}?{}", new_path, q)
212        } else {
213            new_path.clone()
214        };
215
216        let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
217            error!("URI rewrite failed: {}", e);
218            pingora_core::Error::new_str("URI rewrite failed")
219        })?;
220        upstream_request.set_uri(new_uri);
221        ctx.rewritten_path = Some(new_path);
222
223        // Remove client authentication headers
224        upstream_request.remove_header("x-api-key");
225        upstream_request.remove_header("authorization");
226
227        // For conversion requests, remove content-length header since body size will change
228        // The upstream will use HTTP/2 DATA frame lengths instead of content-length validation
229        if is_conversion_request {
230            upstream_request.remove_header("content-length");
231            debug!("[UPSTREAM] Removed content-length for body transformation");
232        }
233
234        // Inject API key for this backend
235        upstream_request
236            .insert_header("authorization", format!("Bearer {}", backend.api_key))
237            .map_err(|e| {
238                error!("Failed to inject authorization header: {}", e);
239                pingora_core::Error::new_str("Header injection failed")
240            })?;
241
242        // Set host header
243        upstream_request
244            .insert_header("host", &backend.host)
245            .map_err(|e| {
246                error!("Failed to set host header: {}", e);
247                pingora_core::Error::new_str("Host header failed")
248            })?;
249
250        debug!(
251            "[UPSTREAM] {} {} -> {}",
252            upstream_request.method.as_str(),
253            upstream_request.uri,
254            backend.name
255        );
256
257        Ok(())
258    }
259
260    /// Capture and transform request body (Responses API → Chat API).
261    ///
262    /// For conversion requests, we need to buffer ALL chunks and send the
263    /// converted body at end_of_stream. This is because HTTP/2 DATA frames
264    /// arrive incrementally and we can't convert partial bodies.
265    async fn request_body_filter(
266        &self,
267        _session: &mut Session,
268        body: &mut Option<Bytes>,
269        end_of_stream: bool,
270        ctx: &mut Self::CTX,
271    ) -> PingoraResult<()>
272    where
273        Self::CTX: Send + Sync,
274    {
275        // For conversion requests, buffer all chunks and suppress forwarding
276        // until we have the complete body for conversion.
277        if ctx.is_conversion_request {
278            // Buffer the chunk with size limit check
279            if let Some(b) = body {
280                if ctx.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
281                    error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
282                    return Err(pingora_core::Error::new_str("Request body too large"));
283                }
284                ctx.request_body.extend_from_slice(b);
285                debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.request_body.len());
286            }
287
288            // For conversion requests, suppress forwarding by returning empty body
289            // At end_of_stream, we'll send the converted body
290            *body = Some(Bytes::new());
291
292            // Only process when we have the complete body
293            if end_of_stream {
294                debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.request_body.len());
295
296                let backend_name = ctx.selected_backend.as_ref().map(|b| b.name.clone());
297
298                // Parse and convert request
299                if let (Some(b_name), Some(_backend)) = (backend_name.as_ref(), ctx.selected_backend.as_ref())
300                    && let Some(provider) = self.get_provider(b_name) {
301                        // Get model override from backend config (extract value before mutable borrow)
302                        let model_override = ctx.selected_backend.as_ref()
303                            .and_then(|b| b.model.as_deref())
304                            .map(|s| s.to_string());
305                        // Parse as ResponseRequest
306                        match serde_json::from_slice::<ResponseRequest>(&ctx.request_body) {
307                            Ok(mut response_req) => {
308                                let mut previous_messages: Option<Vec<ChatMessage>> = None;
309                                if let Some(prev_id) = response_req.previous_response_id.clone() {
310                                    if let Some(snapshot) = self.get_conversation(&prev_id) {
311                                        if matches!(
312                                            &response_req.input,
313                                            crate::types::response_api::InputItemOrString::Array(_)
314                                        ) {
315                                            debug!(
316                                                "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
317                                            );
318                                        }
319                                        if response_req.instructions.is_none() {
320                                            response_req.instructions = snapshot.instructions.clone();
321                                        }
322                                        previous_messages = Some(snapshot.messages);
323                                    } else {
324                                        warn!(
325                                            "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
326                                            prev_id
327                                        );
328                                    }
329                                }
330                                let context = ResponseRequestContext::from(&response_req);
331                                // Set context before converting (needs mutable ctx)
332                                ctx.set_response_request_context(context);
333                                // Convert using provider (model_override is now an owned String)
334                                match response_to_chat(response_req, provider.as_ref(), model_override.as_deref()) {
335                                    Ok(mut chat_req) => {
336                                        if let Some(history) = previous_messages {
337                                            chat_req.messages = merge_history_messages(history, chat_req.messages);
338                                        }
339                                        ctx.pending_instructions = chat_req
340                                            .messages
341                                            .iter()
342                                            .find(|m| m.role == MessageRole::System)
343                                            .map(|m| m.content.as_text());
344                                        ctx.pending_conversation_messages = Some(chat_req.messages.clone());
345                                        // Serialize ChatRequest as JSON
346                                        match serde_json::to_vec(&chat_req) {
347                                            Ok(converted) => {
348                                                let converted_len = converted.len();
349                                                // Log conversion if enabled
350                                                if self.log_body {
351                                                    debug!("[CONVERTED REQUEST] {}", String::from_utf8_lossy(&converted));
352                                                }
353                                                // Save converted request for debugging to logs directory
354                                                let path = self.log_dir.join("converted_request.json");
355                                                if std::fs::write(&path, &converted).is_ok() {
356                                                    debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
357                                                }
358                                                // Replace body with converted request - this is what will be sent
359                                                *body = Some(Bytes::from(converted));
360                                                debug!("[BODY] Sending converted body: {} bytes", converted_len);
361                                            }
362                                            Err(e) => {
363                                                error!("Failed to serialize ChatRequest: {}", e);
364                                                // Restore original body to let upstream handle the error
365                                                *body = Some(Bytes::from(ctx.request_body.clone()));
366                                            }
367                                        }
368                                    }
369                                    Err(e) => {
370                                        error!("Failed to convert request: {}", e);
371                                        // Restore original body to let upstream handle the error
372                                        *body = Some(Bytes::from(ctx.request_body.clone()));
373                                    }
374                                }
375                            }
376                            Err(e) => {
377                                // If parsing fails, keep original body
378                                debug!("Failed to parse as ResponseRequest, keeping original: {}", e);
379                                // Restore original body to let upstream handle the error
380                                *body = Some(Bytes::from(ctx.request_body.clone()));
381                                // Save full request body to file for debugging
382                                let path = self.log_dir.join("codex_request_body.json");
383                                if std::fs::write(&path, &ctx.request_body).is_ok() {
384                                    debug!("[REQUEST BODY SAVED] to {}", path.display());
385                                }
386                            }
387                        }
388                    }
389
390                // Parse model name from original request for logging
391                ctx.init_from_request_body();
392            }
393
394            return Ok(());
395        }
396
397        // Non-conversion requests: pass through normally
398        if let Some(b) = body {
399            ctx.request_body.extend_from_slice(b);
400        }
401
402        // Only process when we have the complete body
403        if end_of_stream && !ctx.request_body.is_empty() {
404            // Parse model name from original request for logging
405            ctx.init_from_request_body();
406        }
407
408        Ok(())
409    }
410
411    /// Handle upstream response headers.
412    async fn response_filter(
413        &self,
414        _session: &mut Session,
415        upstream_response: &mut ResponseHeader,
416        ctx: &mut Self::CTX,
417    ) -> PingoraResult<()> {
418        let status = upstream_response.status.as_u16();
419        let content_type = upstream_response
420            .headers
421            .get("content-type")
422            .and_then(|v| v.to_str().ok())
423            .unwrap_or("")
424            .to_string();
425        let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
426        let is_success = (200..300).contains(&status);
427
428        ctx.upstream_status = Some(status);
429        ctx.upstream_content_type = Some(content_type.clone());
430        ctx.should_convert_stream_response =
431            ctx.is_streaming && ctx.is_conversion_request && is_success && is_sse;
432
433        if ctx.is_conversion_request {
434            upstream_response.remove_header("content-length");
435            debug!(
436                "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
437                status,
438                content_type
439            );
440        }
441
442        if ctx.is_streaming && ctx.is_conversion_request && !ctx.should_convert_stream_response {
443            warn!(
444                "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
445                status,
446                content_type,
447                if !is_success {
448                    "upstream_non_2xx"
449                } else if !is_sse {
450                    "upstream_not_sse"
451                } else {
452                    "unknown"
453                }
454            );
455        }
456
457        debug!(
458            "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
459            status,
460            ctx.is_streaming,
461            ctx.is_conversion_request,
462            ctx.should_convert_stream_response
463        );
464        Ok(())
465    }
466
467    /// Transform response body (Chat API → Responses API for streaming).
468    fn response_body_filter(
469        &self,
470        _session: &mut Session,
471        body: &mut Option<Bytes>,
472        end_of_body: bool,
473        ctx: &mut Self::CTX,
474    ) -> PingoraResult<Option<Duration>>
475    where
476        Self::CTX: Send + Sync,
477    {
478        // Clone the body bytes for processing
479        let body_clone = body.clone();
480
481        debug!(
482            "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
483            body_clone.as_ref().map(|b| b.len()),
484            end_of_body,
485            ctx.is_streaming,
486            ctx.is_conversion_request
487        );
488
489        if let Some(b) = body_clone.as_ref() {
490            // Check response body size limit for non-streaming conversion only.
491            // For streaming conversion we compact parsed bytes to avoid unbounded growth
492            // and must not switch protocol mid-stream.
493            if !ctx.is_streaming
494                && ctx.is_conversion_request
495                && ctx.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
496            {
497                warn!(
498                    "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
499                    MAX_RESPONSE_BODY_SIZE
500                );
501            } else {
502                ctx.response_body.extend_from_slice(b);
503            }
504
505            // Suppress intermediate chunks for non-streaming conversion requests
506            // (the converted body will be sent at end_of_body)
507            if !ctx.is_streaming && ctx.is_conversion_request && !end_of_body {
508                *body = Some(Bytes::new());
509            }
510
511            // For streaming conversion responses, convert each SSE chunk
512            if ctx.should_convert_stream_response {
513                // Suppress raw Chat API body immediately - it will be replaced
514                // with converted Responses API events below (or empty if no events)
515                *body = Some(Bytes::new());
516
517                // Get provider clone before mutable borrow to avoid borrow conflict
518                let provider = ctx.provider_name.as_ref()
519                    .and_then(|name| self.get_provider(name));
520
521                // Delegate to StreamingResponseHandler for chunk processing
522                let mut handler = StreamingResponseHandler::new(
523                    ctx,
524                    provider,
525                    self.log_body,
526                    self.conversation_store.clone(),
527                );
528
529                // Process streaming frames until end_of_body
530                if let Some(converted) = handler.process_stream_frame() {
531                    *body = Some(Bytes::from(converted));
532                }
533
534                // For streaming responses, append response.completed event at end_of_body
535                if end_of_body {
536                    let completed_events = handler.finalize_stream();
537                    if !completed_events.is_empty() {
538                        let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
539                            .unwrap_or("")
540                            .to_string();
541                        let combined = format!("{}{}", existing, completed_events.join(""));
542                        *body = Some(Bytes::from(combined));
543                    }
544                }
545            }
546        }
547
548        if end_of_body {
549            let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
550
551            // For non-streaming conversion responses, convert the full body
552            if !ctx.is_streaming && ctx.is_conversion_request && !ctx.response_body.is_empty()
553                && let Ok(text) = std::str::from_utf8(&ctx.response_body)
554                    && let Ok(chat_resp) = serde_json::from_str::<crate::types::chat_api::ChatResponse>(text) {
555                        let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
556                        // Get context from stream_state (set during request_body_filter)
557                        let request_context = ctx.stream_state.as_ref()
558                            .and_then(|s| s.request_context.as_ref());
559                        match crate::convert::chat_to_response_with_context(
560                            chat_resp,
561                            request_context,
562                        ) {
563                            Ok(response_obj) => {
564                                if let Ok(converted) = serde_json::to_vec(&response_obj) {
565                                    if self.log_body {
566                                        debug!("[CONVERTED RESPONSE] {}", String::from_utf8_lossy(&converted));
567                                    }
568                                    *body = Some(Bytes::from(converted));
569                                }
570                                if let (Some(mut messages), Some(assistant_message)) = (
571                                    ctx.pending_conversation_messages.clone(),
572                                    assistant_message,
573                                ) {
574                                    messages.push(assistant_message);
575                                    self.store_conversation(
576                                        response_obj.id.clone(),
577                                        ConversationSnapshot {
578                                            instructions: ctx.pending_instructions.clone(),
579                                            messages,
580                                        },
581                                    );
582                                }
583                            }
584                            Err(e) => {
585                                error!("Failed to convert response: {}", e);
586                            }
587                        }
588                    }
589
590            info!(
591                "[DONE] provider={}, model={:?}, duration={}ms",
592                ctx.provider_name.as_deref().unwrap_or("unknown"),
593                ctx.model.as_ref(),
594                duration_ms
595            );
596        }
597
598        Ok(None)
599    }
600
601    /// Called when connected to upstream.
602    async fn connected_to_upstream(
603        &self,
604        _session: &mut Session,
605        reused: bool,
606        peer: &HttpPeer,
607        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
608        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
609        digest: Option<&Digest>,
610        ctx: &mut Self::CTX,
611    ) -> PingoraResult<()> {
612        let tls_version = digest
613            .and_then(|d| d.ssl_digest.as_ref())
614            .map(|ssl| ssl.version.to_string())
615            .unwrap_or_else(|| "none".to_string());
616
617        let use_tls = ctx.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
618        let backend_name = ctx.provider_name.as_deref().unwrap_or("unknown");
619
620        info!(
621            "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
622            peer.sni(),
623            peer.address(),
624            backend_name,
625            use_tls,
626            reused,
627            tls_version
628        );
629
630        Ok(())
631    }
632
633    /// Handle proxy errors.
634    fn error_while_proxy(
635        &self,
636        peer: &HttpPeer,
637        _session: &mut Session,
638        e: Box<pingora_core::Error>,
639        ctx: &mut Self::CTX,
640        _client_reused: bool,
641    ) -> Box<pingora_core::Error> {
642        error!(
643            "[ERROR] proxy error to {}: {}",
644            peer.address(),
645            e
646        );
647
648        let mut e = e.more_context(format!("Provider: {}", ctx.provider_name.as_deref().unwrap_or("unknown")));
649        e.retry.decide_reuse(false);
650        e
651    }
652
653    /// Handle fatal errors when proxy cannot be established.
654    async fn fail_to_proxy(
655        &self,
656        session: &mut Session,
657        e: &pingora_core::Error,
658        ctx: &mut Self::CTX,
659    ) -> pingora_proxy::FailToProxy
660    where
661        Self::CTX: Send + Sync,
662    {
663        let code = match *e.etype() {
664            pingora_core::ErrorType::ConnectTimedout => 504,
665            pingora_core::ErrorType::ConnectRefused => 502,
666            pingora_core::ErrorType::TLSHandshakeFailure => 502,
667            _ => 502,
668        };
669
670        let method = session.req_header().method.as_str();
671        let uri = &session.req_header().uri;
672
673        error!(
674            "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
675            method,
676            uri,
677            code,
678            ctx.provider_name.as_deref().unwrap_or("unknown"),
679            ctx.model.as_ref(),
680            e
681        );
682
683        // Return error response (use serde_json for proper escaping)
684        let error_body = serde_json::json!({
685            "error": {
686                "type": "proxy_error",
687                "code": code,
688                "message": e.to_string()
689            }
690        })
691        .to_string();
692        if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
693            let _ = resp.insert_header("content-type", "application/json");
694            let _ = resp.insert_header("content-length", error_body.len().to_string());
695            let _ = session.write_response_header(Box::new(resp), false).await;
696            let _ = session
697                .write_response_body(Some(bytes::Bytes::from(error_body)), true)
698                .await;
699        }
700
701        pingora_proxy::FailToProxy {
702            error_code: code,
703            can_reuse_downstream: false,
704        }
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use std::collections::HashMap;
711    use std::sync::Arc;
712
713    use crate::config::{BackendConfig, MatchRules};
714    use crate::types::chat_api::{ChatMessage, Content, MessageRole};
715    use crate::providers::GLMProvider;
716    use crate::providers::Provider;
717
718    use super::CodexProxy;
719
720    #[test]
721    fn test_proxy_creation() {
722        let configs = vec![BackendConfig {
723            name: "glm".to_string(),
724            url: "https://api.example.com".to_string(),
725            api_key: "test-key".to_string(),
726            protocol: "openai".to_string(),
727            model: None,
728            match_rules: MatchRules {
729                default: true,
730                ..Default::default()
731            },
732        }];
733        let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
734        let mut providers = HashMap::new();
735        providers.insert("glm".to_string(), Box::new(GLMProvider) as Box<dyn Provider + Send + Sync>);
736        let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
737        assert!(proxy.log_body);
738    }
739
740    fn msg(role: MessageRole, text: &str) -> ChatMessage {
741        ChatMessage {
742            role,
743            content: Content::String(text.to_string()),
744            name: None,
745            annotations: None,
746            tool_calls: None,
747            function_call: None,
748            tool_call_id: None,
749            refusal: None,
750        }
751    }
752
753    #[test]
754    fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
755        let history = vec![
756            msg(MessageRole::System, "You are helpful"),
757            msg(MessageRole::User, "hello"),
758            msg(MessageRole::Assistant, "hi"),
759        ];
760        let current = vec![
761            msg(MessageRole::System, "You are helpful"),
762            msg(MessageRole::User, "hello"),
763            msg(MessageRole::Assistant, "hi"),
764            msg(MessageRole::User, "next question"),
765        ];
766
767        let merged = super::merge_history_messages(history, current);
768        assert_eq!(merged.len(), 4);
769        assert_eq!(merged[3].content.as_text(), "next question");
770    }
771
772    #[test]
773    fn test_merge_history_messages_when_no_overlap_appends_all_current() {
774        let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
775        let current = vec![msg(MessageRole::User, "new question")];
776
777        let merged = super::merge_history_messages(history, current);
778        assert_eq!(merged.len(), 3);
779        assert_eq!(merged[2].content.as_text(), "new question");
780    }
781}