1use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat, ToolPriority};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25impl CodexProxy {
26 fn try_convert_request_body(
39 &self,
40 ctx: &mut ProxyContext,
41 ) -> Result<Vec<u8>, crate::error::ConversionError> {
42 use crate::error::ConversionError;
43
44 let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
45 ConversionError::ProviderError("no backend selected".to_string())
46 })?;
47 let model_override = backend.model.clone();
48 let provider = self.get_provider(&backend.name).ok_or_else(|| {
49 ConversionError::ProviderError(format!(
50 "no provider registered for backend '{}'",
51 backend.name
52 ))
53 })?;
54
55 let mut response_req: ResponseRequest =
56 serde_json::from_slice(&ctx.buffers.request_body)?;
57 ctx.init_from_response_request(&response_req);
58
59 let mut previous_messages: Option<Vec<ChatMessage>> = None;
61 if let Some(prev_id) = response_req.previous_response_id.clone() {
62 if let Some(snapshot) = self.get_conversation(&prev_id) {
63 if matches!(
64 &response_req.input,
65 crate::types::response_api::InputItemOrString::Array(_)
66 ) {
67 debug!(
68 "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
69 );
70 }
71 if response_req.instructions.is_none() {
72 response_req.instructions = snapshot.instructions.clone();
73 }
74 previous_messages = Some(snapshot.messages);
75 } else {
76 warn!(
77 "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
78 prev_id
79 );
80 }
81 }
82
83 let context = ResponseRequestContext::from(&response_req);
84 ctx.set_response_request_context(context);
85
86 let mut chat_req = response_to_chat(
87 response_req,
88 provider.as_ref(),
89 model_override.as_deref(),
90 ToolPriority::Merge,
91 )?;
92
93 if let Some(history) = previous_messages {
94 chat_req.messages = merge_history_messages(history, chat_req.messages);
95 }
96
97 ctx.follow_up.pending_instructions = chat_req
98 .messages
99 .iter()
100 .find(|m| m.role == MessageRole::System)
101 .map(|m| m.content.as_text());
102 ctx.follow_up.pending_conversation_messages = Some(chat_req.messages.clone());
103
104 serde_json::to_vec(&chat_req).map_err(ConversionError::from)
105 }
106}
107
108fn merge_history_messages(
109 mut history: Vec<ChatMessage>,
110 current_turn_messages: Vec<ChatMessage>,
111) -> Vec<ChatMessage> {
112 let mut overlap = 0usize;
115 while overlap < history.len() && overlap < current_turn_messages.len() {
116 let same = serde_json::to_value(&history[overlap]).ok()
117 == serde_json::to_value(¤t_turn_messages[overlap]).ok();
118 if !same {
119 break;
120 }
121 overlap += 1;
122 }
123
124 if overlap > 0 {
125 debug!(
126 "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
127 overlap
128 );
129 } else if !current_turn_messages.is_empty() {
130 debug!(
131 "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
132 );
133 }
134
135 history.extend(current_turn_messages.into_iter().skip(overlap));
136 history
137}
138
139#[async_trait]
140impl ProxyHttp for CodexProxy {
141 type CTX = ProxyContext;
142
143 fn new_ctx(&self) -> Self::CTX {
144 ProxyContext::new()
145 }
146
147 async fn request_filter(
149 &self,
150 session: &mut Session,
151 ctx: &mut Self::CTX,
152 ) -> PingoraResult<bool>
153 where
154 Self::CTX: Send + Sync,
155 {
156 let method = session.req_header().method.as_str().to_string();
157 let path = session.req_header().uri.path().to_string();
158
159 let headers: Vec<(String, String)> = session
161 .req_header()
162 .headers
163 .iter()
164 .map(|(name, value)| {
165 (
166 name.as_str().to_string(),
167 value.to_str().unwrap_or("<binary>").to_string(),
168 )
169 })
170 .collect();
171
172 let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
174 Some(pair) => pair,
175 None => {
176 warn!("[REQUEST] No matching backend for path: {}", path);
177 return Err(pingora_core::Error::new_str("No matching backend"));
178 }
179 };
180
181 let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
182 let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
183 if stripped.is_empty() {
184 "/".to_string()
185 } else if stripped.starts_with('/') {
186 stripped.to_string()
187 } else {
188 format!("/{}", stripped)
189 }
190 } else {
191 path.clone()
192 };
193 ctx.route.normalized_path = Some(normalized_path.clone());
194
195 let is_conversion = (normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses")) && method == "POST";
197 ctx.flags.is_conversion_request = is_conversion;
198
199 if is_conversion {
200 debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
201 }
202
203 ctx.route.selected_backend = Some(backend.clone());
204 ctx.route.provider_name = Some(backend.name.clone());
205
206 debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
207
208 Ok(false)
210 }
211
212 async fn upstream_peer(
214 &self,
215 _session: &mut Session,
216 ctx: &mut Self::CTX,
217 ) -> PingoraResult<Box<HttpPeer>> {
218 let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
219 error!("No backend selected");
220 pingora_core::Error::new_str("No backend selected")
221 })?;
222
223 let mut peer = HttpPeer::new(
224 (backend.host.as_str(), backend.port),
225 backend.use_tls,
226 backend.host.clone(),
227 );
228
229 peer.options.alpn = ALPN::H2H1;
231
232 peer.options.connection_timeout = Some(Duration::from_secs(10));
234 peer.options.total_connection_timeout = Some(Duration::from_secs(30));
235 peer.options.idle_timeout = Some(Duration::from_secs(90));
236 peer.options.tcp_keepalive = Some(TcpKeepalive {
237 idle: Duration::from_secs(60),
238 interval: Duration::from_secs(5),
239 count: 5,
240 });
241
242 if backend.use_tls {
243 peer.options.h2_ping_interval = Some(Duration::from_secs(30));
244 }
245
246 Ok(Box::new(peer))
247 }
248
249 async fn upstream_request_filter(
251 &self,
252 session: &mut Session,
253 upstream_request: &mut RequestHeader,
254 ctx: &mut Self::CTX,
255 ) -> PingoraResult<()> {
256 let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
257 error!("No backend selected");
258 pingora_core::Error::new_str("No backend selected")
259 })?;
260
261 let original_uri = session.req_header().uri.clone();
262 let path = original_uri.path().to_string();
263 let query = original_uri.query();
264 let normalized_path = ctx.route.normalized_path.as_deref().unwrap_or(path.as_str());
265
266 let is_conversion_request = (normalized_path.starts_with("/v1/responses")
268 || normalized_path.starts_with("/responses"))
269 && upstream_request.method.as_str() == "POST";
270
271 let chat_api_path = if is_conversion_request || normalized_path.starts_with("/v1/chat/completions") {
274 if let Some(provider) = self.get_provider(&backend.name) {
276 provider.chat_completions_path()
277 } else {
278 "/chat/completions".to_string()
280 }
281 } else {
282 normalized_path.to_string()
283 };
284
285 let new_path = if !backend.base_path.is_empty() {
287 format!("{}{}", backend.base_path, chat_api_path)
288 } else {
289 chat_api_path
290 };
291
292 let new_uri_str = if let Some(q) = query {
294 format!("{}?{}", new_path, q)
295 } else {
296 new_path.clone()
297 };
298
299 let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
300 error!("URI rewrite failed: {}", e);
301 pingora_core::Error::new_str("URI rewrite failed")
302 })?;
303 upstream_request.set_uri(new_uri);
304 ctx.route.rewritten_path = Some(new_path);
305
306 upstream_request.remove_header("x-api-key");
308 upstream_request.remove_header("authorization");
309
310 if is_conversion_request {
313 upstream_request.remove_header("content-length");
314 debug!("[UPSTREAM] Removed content-length for body transformation");
315 }
316
317 upstream_request
319 .insert_header("authorization", format!("Bearer {}", backend.api_key))
320 .map_err(|e| {
321 error!("Failed to inject authorization header: {}", e);
322 pingora_core::Error::new_str("Header injection failed")
323 })?;
324
325 upstream_request
327 .insert_header("host", &backend.host)
328 .map_err(|e| {
329 error!("Failed to set host header: {}", e);
330 pingora_core::Error::new_str("Host header failed")
331 })?;
332
333 debug!(
334 "[UPSTREAM] {} {} -> {}",
335 upstream_request.method.as_str(),
336 upstream_request.uri,
337 backend.name
338 );
339
340 Ok(())
341 }
342
343 async fn request_body_filter(
349 &self,
350 _session: &mut Session,
351 body: &mut Option<Bytes>,
352 end_of_stream: bool,
353 ctx: &mut Self::CTX,
354 ) -> PingoraResult<()>
355 where
356 Self::CTX: Send + Sync,
357 {
358 if ctx.flags.is_conversion_request {
361 if let Some(b) = body {
363 if ctx.buffers.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
364 error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
365 return Err(pingora_core::Error::new_str("Request body too large"));
366 }
367 ctx.buffers.request_body.extend_from_slice(b);
368 debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.buffers.request_body.len());
369 }
370
371 *body = Some(Bytes::new());
374
375 if end_of_stream {
377 debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.buffers.request_body.len());
378
379 match self.try_convert_request_body(ctx) {
380 Ok(converted) => {
381 if self.log_body {
382 debug!(
383 "[CONVERTED REQUEST] {}",
384 String::from_utf8_lossy(&converted)
385 );
386 }
387 let path = self.log_dir.join("converted_request.json");
388 if std::fs::write(&path, &converted).is_ok() {
389 debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
390 }
391 debug!("[BODY] Sending converted body: {} bytes", converted.len());
392 *body = Some(Bytes::from(converted));
393 }
394 Err(e) => {
395 error!("[BODY] Conversion failed; aborting upstream: {}", e);
396 let path = self.log_dir.join("codex_request_body.json");
397 let _ = std::fs::write(&path, &ctx.buffers.request_body);
398 return Err(pingora_core::Error::explain(
399 pingora_core::ErrorType::HTTPStatus(400),
400 format!("proxy conversion failed: {e}"),
401 ));
402 }
403 }
404 }
405
406 return Ok(());
407 }
408
409 if let Some(b) = body {
411 ctx.buffers.request_body.extend_from_slice(b);
412 }
413
414 if end_of_stream
416 && !ctx.buffers.request_body.is_empty()
417 && let Ok(json) = serde_json::from_slice::<serde_json::Value>(&ctx.buffers.request_body)
418 {
419 ctx.init_from_passthrough_json(&json);
421 }
422
423 Ok(())
424 }
425
426 async fn response_filter(
428 &self,
429 _session: &mut Session,
430 upstream_response: &mut ResponseHeader,
431 ctx: &mut Self::CTX,
432 ) -> PingoraResult<()> {
433 let status = upstream_response.status.as_u16();
434 let content_type = upstream_response
435 .headers
436 .get("content-type")
437 .and_then(|v| v.to_str().ok())
438 .unwrap_or("")
439 .to_string();
440 let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
441 let is_success = (200..300).contains(&status);
442
443 ctx.diagnostics.upstream_status = Some(status);
444 ctx.diagnostics.upstream_content_type = Some(content_type.clone());
445 ctx.flags.should_convert_stream_response =
446 ctx.flags.is_streaming && ctx.flags.is_conversion_request && is_success && is_sse;
447
448 if ctx.flags.is_conversion_request {
449 upstream_response.remove_header("content-length");
450 debug!(
451 "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
452 status,
453 content_type
454 );
455 }
456
457 if ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.flags.should_convert_stream_response {
458 warn!(
459 "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
460 status,
461 content_type,
462 if !is_success {
463 "upstream_non_2xx"
464 } else if !is_sse {
465 "upstream_not_sse"
466 } else {
467 "unknown"
468 }
469 );
470 }
471
472 debug!(
473 "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
474 status,
475 ctx.flags.is_streaming,
476 ctx.flags.is_conversion_request,
477 ctx.flags.should_convert_stream_response
478 );
479 Ok(())
480 }
481
482 fn response_body_filter(
484 &self,
485 _session: &mut Session,
486 body: &mut Option<Bytes>,
487 end_of_body: bool,
488 ctx: &mut Self::CTX,
489 ) -> PingoraResult<Option<Duration>>
490 where
491 Self::CTX: Send + Sync,
492 {
493 let body_clone = body.clone();
495
496 debug!(
497 "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
498 body_clone.as_ref().map(|b| b.len()),
499 end_of_body,
500 ctx.flags.is_streaming,
501 ctx.flags.is_conversion_request
502 );
503
504 if let Some(b) = body_clone.as_ref() {
505 if !ctx.flags.is_streaming
509 && ctx.flags.is_conversion_request
510 && ctx.buffers.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
511 {
512 warn!(
513 "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
514 MAX_RESPONSE_BODY_SIZE
515 );
516 } else {
517 ctx.buffers.response_body.extend_from_slice(b);
518 }
519
520 if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !end_of_body {
523 *body = Some(Bytes::new());
524 }
525
526 if ctx.flags.should_convert_stream_response {
528 *body = Some(Bytes::new());
531
532 let provider = ctx.route.provider_name.as_ref()
534 .and_then(|name| self.get_provider(name));
535
536 let mut handler = StreamingResponseHandler::new(
538 ctx,
539 provider,
540 self.log_body,
541 self.conversation_store.clone(),
542 );
543
544 if let Some(converted) = handler.process_stream_frame() {
546 *body = Some(Bytes::from(converted));
547 }
548
549 if end_of_body {
551 let completed_events = handler.finalize_stream();
552 if !completed_events.is_empty() {
553 let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
554 .unwrap_or("")
555 .to_string();
556 let combined = format!("{}{}", existing, completed_events.join(""));
557 *body = Some(Bytes::from(combined));
558 }
559 }
560 }
561 }
562
563 if end_of_body {
564 let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
565
566 if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.buffers.response_body.is_empty() {
572 let text = std::str::from_utf8(&ctx.buffers.response_body).map_err(|e| {
573 error!("[RESPONSE_BODY] upstream body not valid UTF-8: {}", e);
574 pingora_core::Error::explain(
575 pingora_core::ErrorType::HTTPStatus(502),
576 format!("upstream response is not valid UTF-8: {e}"),
577 )
578 })?;
579 let chat_resp: crate::types::chat_api::ChatResponse =
580 serde_json::from_str(text).map_err(|e| {
581 error!("[RESPONSE_BODY] failed to parse upstream ChatResponse: {}", e);
582 pingora_core::Error::explain(
583 pingora_core::ErrorType::HTTPStatus(502),
584 format!("upstream response not a valid Chat completion: {e}"),
585 )
586 })?;
587 let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
588 let request_context = ctx
589 .stream_state
590 .as_ref()
591 .and_then(|s| s.request_context.as_ref());
592 let response_obj =
593 crate::convert::chat_to_response_with_context(chat_resp, request_context)
594 .map_err(|e| {
595 error!("[RESPONSE_BODY] failed to convert response: {}", e);
596 pingora_core::Error::explain(
597 pingora_core::ErrorType::HTTPStatus(500),
598 format!("proxy response conversion failed: {e}"),
599 )
600 })?;
601 let converted = serde_json::to_vec(&response_obj).map_err(|e| {
602 error!("[RESPONSE_BODY] failed to serialize converted response: {}", e);
603 pingora_core::Error::explain(
604 pingora_core::ErrorType::HTTPStatus(500),
605 format!("proxy response serialization failed: {e}"),
606 )
607 })?;
608 if self.log_body {
609 debug!(
610 "[CONVERTED RESPONSE] {}",
611 String::from_utf8_lossy(&converted)
612 );
613 }
614 *body = Some(Bytes::from(converted));
615 if let (Some(mut messages), Some(assistant_message)) = (
616 ctx.follow_up.pending_conversation_messages.clone(),
617 assistant_message,
618 ) {
619 messages.push(assistant_message);
620 self.store_conversation(
621 response_obj.id.clone(),
622 ConversationSnapshot {
623 instructions: ctx.follow_up.pending_instructions.clone(),
624 messages,
625 },
626 );
627 }
628 }
629
630 info!(
631 "[DONE] provider={}, model={:?}, duration={}ms",
632 ctx.route.provider_name.as_deref().unwrap_or("unknown"),
633 ctx.model.as_ref(),
634 duration_ms
635 );
636 }
637
638 Ok(None)
639 }
640
641 async fn connected_to_upstream(
643 &self,
644 _session: &mut Session,
645 reused: bool,
646 peer: &HttpPeer,
647 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
648 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
649 digest: Option<&Digest>,
650 ctx: &mut Self::CTX,
651 ) -> PingoraResult<()> {
652 let tls_version = digest
653 .and_then(|d| d.ssl_digest.as_ref())
654 .map(|ssl| ssl.version.to_string())
655 .unwrap_or_else(|| "none".to_string());
656
657 let use_tls = ctx.route.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
658 let backend_name = ctx.route.provider_name.as_deref().unwrap_or("unknown");
659
660 info!(
661 "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
662 peer.sni(),
663 peer.address(),
664 backend_name,
665 use_tls,
666 reused,
667 tls_version
668 );
669
670 Ok(())
671 }
672
673 fn error_while_proxy(
675 &self,
676 peer: &HttpPeer,
677 _session: &mut Session,
678 e: Box<pingora_core::Error>,
679 ctx: &mut Self::CTX,
680 _client_reused: bool,
681 ) -> Box<pingora_core::Error> {
682 error!(
683 "[ERROR] proxy error to {}: {}",
684 peer.address(),
685 e
686 );
687
688 let mut e = e.more_context(format!("Provider: {}", ctx.route.provider_name.as_deref().unwrap_or("unknown")));
689 e.retry.decide_reuse(false);
690 e
691 }
692
693 async fn fail_to_proxy(
695 &self,
696 session: &mut Session,
697 e: &pingora_core::Error,
698 ctx: &mut Self::CTX,
699 ) -> pingora_proxy::FailToProxy
700 where
701 Self::CTX: Send + Sync,
702 {
703 let code = match *e.etype() {
704 pingora_core::ErrorType::ConnectTimedout => 504,
705 pingora_core::ErrorType::ConnectRefused => 502,
706 pingora_core::ErrorType::TLSHandshakeFailure => 502,
707 _ => 502,
708 };
709
710 let method = session.req_header().method.as_str();
711 let uri = &session.req_header().uri;
712
713 error!(
714 "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
715 method,
716 uri,
717 code,
718 ctx.route.provider_name.as_deref().unwrap_or("unknown"),
719 ctx.model.as_ref(),
720 e
721 );
722
723 let error_body = serde_json::json!({
725 "error": {
726 "type": "proxy_error",
727 "code": code,
728 "message": e.to_string()
729 }
730 })
731 .to_string();
732 if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
733 let _ = resp.insert_header("content-type", "application/json");
734 let _ = resp.insert_header("content-length", error_body.len().to_string());
735 let _ = session.write_response_header(Box::new(resp), false).await;
736 let _ = session
737 .write_response_body(Some(bytes::Bytes::from(error_body)), true)
738 .await;
739 }
740
741 pingora_proxy::FailToProxy {
742 error_code: code,
743 can_reuse_downstream: false,
744 }
745 }
746}
747
748#[cfg(test)]
749mod tests {
750 use std::collections::HashMap;
751 use std::sync::Arc;
752
753 use crate::config::{BackendConfig, MatchRules};
754 use crate::types::chat_api::{ChatMessage, Content, MessageRole};
755 use crate::providers::GLMProvider;
756 use crate::providers::Provider;
757
758 use super::CodexProxy;
759
760 fn make_test_proxy() -> CodexProxy {
761 let configs = vec![BackendConfig {
762 name: "glm".to_string(),
763 url: "https://api.example.com".to_string(),
764 api_key: "test-key".to_string(),
765 protocol: "openai".to_string(),
766 model: None,
767 match_rules: MatchRules {
768 default: true,
769 ..Default::default()
770 },
771 }];
772 let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
773 let mut providers = HashMap::new();
774 providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
775 CodexProxy::new(router, providers, false, std::path::PathBuf::from("logs"))
776 }
777
778 #[test]
779 fn test_proxy_creation() {
780 let configs = vec![BackendConfig {
781 name: "glm".to_string(),
782 url: "https://api.example.com".to_string(),
783 api_key: "test-key".to_string(),
784 protocol: "openai".to_string(),
785 model: None,
786 match_rules: MatchRules {
787 default: true,
788 ..Default::default()
789 },
790 }];
791 let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
792 let mut providers = HashMap::new();
793 providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
794 let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
795 assert!(proxy.log_body);
796 }
797
798 #[test]
799 fn test_try_convert_request_body_rejects_malformed_json() {
800 let proxy = make_test_proxy();
803 let mut ctx = super::ProxyContext::new();
804 ctx.flags.is_conversion_request = true;
805 ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
806 ctx.buffers.request_body = b"{not valid json".to_vec();
807 let err = proxy
808 .try_convert_request_body(&mut ctx)
809 .expect_err("must fail on invalid JSON");
810 assert!(matches!(err, crate::error::ConversionError::JsonError(_)));
811 }
812
813 #[test]
814 fn test_try_convert_request_body_succeeds_on_minimal_request() {
815 let proxy = make_test_proxy();
816 let mut ctx = super::ProxyContext::new();
817 ctx.flags.is_conversion_request = true;
818 ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
819 ctx.buffers.request_body = br#"{"model":"glm-4","input":"hi"}"#.to_vec();
820 let bytes = proxy
821 .try_convert_request_body(&mut ctx)
822 .expect("conversion should succeed");
823 let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
824 assert_eq!(json["model"], "glm-4");
825 assert!(json["messages"].is_array());
826 }
827
828 fn msg(role: MessageRole, text: &str) -> ChatMessage {
829 ChatMessage {
830 role,
831 content: Content::String(text.to_string()),
832 name: None,
833 annotations: None,
834 tool_calls: None,
835 function_call: None,
836 tool_call_id: None,
837 refusal: None,
838 }
839 }
840
841 #[test]
842 fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
843 let history = vec![
844 msg(MessageRole::System, "You are helpful"),
845 msg(MessageRole::User, "hello"),
846 msg(MessageRole::Assistant, "hi"),
847 ];
848 let current = vec![
849 msg(MessageRole::System, "You are helpful"),
850 msg(MessageRole::User, "hello"),
851 msg(MessageRole::Assistant, "hi"),
852 msg(MessageRole::User, "next question"),
853 ];
854
855 let merged = super::merge_history_messages(history, current);
856 assert_eq!(merged.len(), 4);
857 assert_eq!(merged[3].content.as_text(), "next question");
858 }
859
860 #[test]
861 fn test_merge_history_messages_when_no_overlap_appends_all_current() {
862 let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
863 let current = vec![msg(MessageRole::User, "new question")];
864
865 let merged = super::merge_history_messages(history, current);
866 assert_eq!(merged.len(), 3);
867 assert_eq!(merged[2].content.as_text(), "new question");
868 }
869}