1use std::time::Duration;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use pingora_core::protocols::Digest;
8use pingora_core::protocols::TcpKeepalive;
9use pingora_core::Result as PingoraResult;
10use pingora_core::upstreams::peer::{ALPN, HttpPeer, Peer};
11use pingora_http::{RequestHeader, ResponseHeader};
12use pingora_proxy::{ProxyHttp, Session};
13use tracing::{debug, error, info, warn};
14
15use crate::constants::*;
16use crate::convert::{ResponseRequestContext, response_to_chat};
17use crate::proxy::context_store::ConversationSnapshot;
18use crate::types::chat_api::{ChatMessage, MessageRole};
19use crate::types::response_api::ResponseRequest;
20
21use super::context::ProxyContext;
22use super::core::CodexProxy;
23use crate::proxy::streaming_handler::StreamingResponseHandler;
24
25impl CodexProxy {
26 fn try_convert_request_body(
39 &self,
40 ctx: &mut ProxyContext,
41 ) -> Result<Vec<u8>, crate::error::ConversionError> {
42 use crate::error::ConversionError;
43
44 let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
45 ConversionError::ProviderError("no backend selected".to_string())
46 })?;
47 let model_override = backend.model.clone();
48 let provider = self.get_provider(&backend.name).ok_or_else(|| {
49 ConversionError::ProviderError(format!(
50 "no provider registered for backend '{}'",
51 backend.name
52 ))
53 })?;
54
55 let mut response_req: ResponseRequest =
56 serde_json::from_slice(&ctx.buffers.request_body)?;
57 ctx.init_from_response_request(&response_req);
58
59 let mut previous_messages: Option<Vec<ChatMessage>> = None;
61 if let Some(prev_id) = response_req.previous_response_id.clone() {
62 if let Some(snapshot) = self.get_conversation(&prev_id) {
63 if matches!(
64 &response_req.input,
65 crate::types::response_api::InputItemOrString::Array(_)
66 ) {
67 debug!(
68 "[REQUEST_CONVERT] previous_response_id + input[] detected, applying prefer-previous merge policy"
69 );
70 }
71 if response_req.instructions.is_none() {
72 response_req.instructions = snapshot.instructions.clone();
73 }
74 previous_messages = Some(snapshot.messages);
75 } else {
76 warn!(
77 "[REQUEST_CONVERT] previous_response_id not found in context store: {}",
78 prev_id
79 );
80 }
81 }
82
83 let context = ResponseRequestContext::from(&response_req);
84 ctx.set_response_request_context(context);
85
86 let mut chat_req = response_to_chat(
87 response_req,
88 provider.as_ref(),
89 model_override.as_deref(),
90 )?;
91
92 if let Some(history) = previous_messages {
93 chat_req.messages = merge_history_messages(history, chat_req.messages);
94 }
95
96 ctx.follow_up.pending_instructions = chat_req
97 .messages
98 .iter()
99 .find(|m| m.role == MessageRole::System)
100 .map(|m| m.content.as_text());
101 ctx.follow_up.pending_conversation_messages = Some(chat_req.messages.clone());
102
103 serde_json::to_vec(&chat_req).map_err(ConversionError::from)
104 }
105}
106
107fn merge_history_messages(
108 mut history: Vec<ChatMessage>,
109 current_turn_messages: Vec<ChatMessage>,
110) -> Vec<ChatMessage> {
111 let mut overlap = 0usize;
114 while overlap < history.len() && overlap < current_turn_messages.len() {
115 let same = serde_json::to_value(&history[overlap]).ok()
116 == serde_json::to_value(¤t_turn_messages[overlap]).ok();
117 if !same {
118 break;
119 }
120 overlap += 1;
121 }
122
123 if overlap > 0 {
124 debug!(
125 "[REQUEST_CONVERT] detected {} overlapping history messages, appending incremental suffix only",
126 overlap
127 );
128 } else if !current_turn_messages.is_empty() {
129 debug!(
130 "[REQUEST_CONVERT] no overlap with cached history, appending all current messages as incremental"
131 );
132 }
133
134 history.extend(current_turn_messages.into_iter().skip(overlap));
135 history
136}
137
138#[async_trait]
139impl ProxyHttp for CodexProxy {
140 type CTX = ProxyContext;
141
142 fn new_ctx(&self) -> Self::CTX {
143 ProxyContext::new()
144 }
145
146 async fn request_filter(
148 &self,
149 session: &mut Session,
150 ctx: &mut Self::CTX,
151 ) -> PingoraResult<bool>
152 where
153 Self::CTX: Send + Sync,
154 {
155 let method = session.req_header().method.as_str().to_string();
156 let path = session.req_header().uri.path().to_string();
157
158 let headers: Vec<(String, String)> = session
160 .req_header()
161 .headers
162 .iter()
163 .map(|(name, value)| {
164 (
165 name.as_str().to_string(),
166 value.to_str().unwrap_or("<binary>").to_string(),
167 )
168 })
169 .collect();
170
171 let (backend_config, backend) = match self.router.select_with_config(&path, &headers) {
173 Some(pair) => pair,
174 None => {
175 warn!("[REQUEST] No matching backend for path: {}", path);
176 return Err(pingora_core::Error::new_str("No matching backend"));
177 }
178 };
179
180 let normalized_path = if let Some(prefix) = backend_config.match_rules.path_prefix.as_deref() {
181 let stripped = path.strip_prefix(prefix).unwrap_or(path.as_str());
182 if stripped.is_empty() {
183 "/".to_string()
184 } else if stripped.starts_with('/') {
185 stripped.to_string()
186 } else {
187 format!("/{}", stripped)
188 }
189 } else {
190 path.clone()
191 };
192 ctx.route.normalized_path = Some(normalized_path.clone());
193
194 let is_conversion = (normalized_path.starts_with("/v1/responses") || normalized_path.starts_with("/responses")) && method == "POST";
196 ctx.flags.is_conversion_request = is_conversion;
197
198 if is_conversion {
199 debug!("[REQUEST] {} {} -> {} (CONVERSION)", method, normalized_path, "conversion");
200 }
201
202 ctx.route.selected_backend = Some(backend.clone());
203 ctx.route.provider_name = Some(backend.name.clone());
204
205 debug!("[REQUEST] {} {} -> {}", method, normalized_path, backend.name);
206
207 Ok(false)
209 }
210
211 async fn upstream_peer(
213 &self,
214 _session: &mut Session,
215 ctx: &mut Self::CTX,
216 ) -> PingoraResult<Box<HttpPeer>> {
217 let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
218 error!("No backend selected");
219 pingora_core::Error::new_str("No backend selected")
220 })?;
221
222 let mut peer = HttpPeer::new(
223 (backend.host.as_str(), backend.port),
224 backend.use_tls,
225 backend.host.clone(),
226 );
227
228 peer.options.alpn = ALPN::H2H1;
230
231 peer.options.connection_timeout = Some(Duration::from_secs(10));
233 peer.options.total_connection_timeout = Some(Duration::from_secs(30));
234 peer.options.idle_timeout = Some(Duration::from_secs(90));
235 peer.options.tcp_keepalive = Some(TcpKeepalive {
236 idle: Duration::from_secs(60),
237 interval: Duration::from_secs(5),
238 count: 5,
239 });
240
241 if backend.use_tls {
242 peer.options.h2_ping_interval = Some(Duration::from_secs(30));
243 }
244
245 Ok(Box::new(peer))
246 }
247
248 async fn upstream_request_filter(
250 &self,
251 session: &mut Session,
252 upstream_request: &mut RequestHeader,
253 ctx: &mut Self::CTX,
254 ) -> PingoraResult<()> {
255 let backend = ctx.route.selected_backend.as_ref().ok_or_else(|| {
256 error!("No backend selected");
257 pingora_core::Error::new_str("No backend selected")
258 })?;
259
260 let original_uri = session.req_header().uri.clone();
261 let path = original_uri.path().to_string();
262 let query = original_uri.query();
263 let normalized_path = ctx.route.normalized_path.as_deref().unwrap_or(path.as_str());
264
265 let is_conversion_request = (normalized_path.starts_with("/v1/responses")
267 || normalized_path.starts_with("/responses"))
268 && upstream_request.method.as_str() == "POST";
269
270 let chat_api_path = if is_conversion_request || normalized_path.starts_with("/v1/chat/completions") {
273 if let Some(provider) = self.get_provider(&backend.name) {
275 provider.chat_completions_path()
276 } else {
277 "/v1/chat/completions".to_string()
279 }
280 } else {
281 normalized_path.to_string()
282 };
283
284 let new_path = if !backend.base_path.is_empty() {
286 format!("{}{}", backend.base_path, chat_api_path)
287 } else {
288 chat_api_path
289 };
290
291 let new_uri_str = if let Some(q) = query {
293 format!("{}?{}", new_path, q)
294 } else {
295 new_path.clone()
296 };
297
298 let new_uri: http::Uri = new_uri_str.parse().map_err(|e| {
299 error!("URI rewrite failed: {}", e);
300 pingora_core::Error::new_str("URI rewrite failed")
301 })?;
302 upstream_request.set_uri(new_uri);
303 ctx.route.rewritten_path = Some(new_path);
304
305 upstream_request.remove_header("x-api-key");
307 upstream_request.remove_header("authorization");
308
309 if is_conversion_request {
312 upstream_request.remove_header("content-length");
313 debug!("[UPSTREAM] Removed content-length for body transformation");
314 }
315
316 upstream_request
318 .insert_header("authorization", format!("Bearer {}", backend.api_key))
319 .map_err(|e| {
320 error!("Failed to inject authorization header: {}", e);
321 pingora_core::Error::new_str("Header injection failed")
322 })?;
323
324 upstream_request
326 .insert_header("host", &backend.host)
327 .map_err(|e| {
328 error!("Failed to set host header: {}", e);
329 pingora_core::Error::new_str("Host header failed")
330 })?;
331
332 debug!(
333 "[UPSTREAM] {} {} -> {}",
334 upstream_request.method.as_str(),
335 upstream_request.uri,
336 backend.name
337 );
338
339 Ok(())
340 }
341
342 async fn request_body_filter(
348 &self,
349 _session: &mut Session,
350 body: &mut Option<Bytes>,
351 end_of_stream: bool,
352 ctx: &mut Self::CTX,
353 ) -> PingoraResult<()>
354 where
355 Self::CTX: Send + Sync,
356 {
357 if ctx.flags.is_conversion_request {
360 if let Some(b) = body {
362 if ctx.buffers.request_body.len() + b.len() > MAX_REQUEST_BODY_SIZE {
363 error!("[BODY] Request body exceeds maximum size limit of {} bytes", MAX_REQUEST_BODY_SIZE);
364 return Err(pingora_core::Error::new_str("Request body too large"));
365 }
366 ctx.buffers.request_body.extend_from_slice(b);
367 debug!("[BODY] Buffered {} bytes (total: {})", b.len(), ctx.buffers.request_body.len());
368 }
369
370 *body = Some(Bytes::new());
373
374 if end_of_stream {
376 debug!("[BODY] Conversion request complete, {} bytes buffered", ctx.buffers.request_body.len());
377
378 match self.try_convert_request_body(ctx) {
379 Ok(converted) => {
380 if self.log_body {
381 debug!(
382 "[CONVERTED REQUEST] {}",
383 String::from_utf8_lossy(&converted)
384 );
385 }
386 let path = self.log_dir.join("converted_request.json");
387 if std::fs::write(&path, &converted).is_ok() {
388 debug!("[CONVERTED REQUEST SAVED] to {}", path.display());
389 }
390 debug!("[BODY] Sending converted body: {} bytes", converted.len());
391 *body = Some(Bytes::from(converted));
392 }
393 Err(e) => {
394 error!("[BODY] Conversion failed; aborting upstream: {}", e);
395 let path = self.log_dir.join("codex_request_body.json");
396 let _ = std::fs::write(&path, &ctx.buffers.request_body);
397 return Err(pingora_core::Error::explain(
398 pingora_core::ErrorType::HTTPStatus(400),
399 format!("proxy conversion failed: {e}"),
400 ));
401 }
402 }
403 }
404
405 return Ok(());
406 }
407
408 if let Some(b) = body {
410 ctx.buffers.request_body.extend_from_slice(b);
411 }
412
413 if end_of_stream
415 && !ctx.buffers.request_body.is_empty()
416 && let Ok(json) = serde_json::from_slice::<serde_json::Value>(&ctx.buffers.request_body)
417 {
418 ctx.init_from_passthrough_json(&json);
420 }
421
422 Ok(())
423 }
424
425 async fn response_filter(
427 &self,
428 _session: &mut Session,
429 upstream_response: &mut ResponseHeader,
430 ctx: &mut Self::CTX,
431 ) -> PingoraResult<()> {
432 let status = upstream_response.status.as_u16();
433 let content_type = upstream_response
434 .headers
435 .get("content-type")
436 .and_then(|v| v.to_str().ok())
437 .unwrap_or("")
438 .to_string();
439 let is_sse = content_type.to_ascii_lowercase().contains("text/event-stream");
440 let is_success = (200..300).contains(&status);
441
442 ctx.diagnostics.upstream_status = Some(status);
443 ctx.diagnostics.upstream_content_type = Some(content_type.clone());
444 ctx.flags.should_convert_stream_response =
445 ctx.flags.is_streaming && ctx.flags.is_conversion_request && is_success && is_sse;
446
447 if ctx.flags.is_conversion_request {
448 upstream_response.remove_header("content-length");
449 debug!(
450 "[RESPONSE] removed content-length for conversion response (status={}, content_type={})",
451 status,
452 content_type
453 );
454 }
455
456 if ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.flags.should_convert_stream_response {
457 warn!(
458 "[RESPONSE] bypass stream conversion: status={}, content_type='{}', reason={}",
459 status,
460 content_type,
461 if !is_success {
462 "upstream_non_2xx"
463 } else if !is_sse {
464 "upstream_not_sse"
465 } else {
466 "unknown"
467 }
468 );
469 }
470
471 debug!(
472 "[RESPONSE] status={}, is_streaming={}, is_conversion={}, should_convert_stream={}",
473 status,
474 ctx.flags.is_streaming,
475 ctx.flags.is_conversion_request,
476 ctx.flags.should_convert_stream_response
477 );
478 Ok(())
479 }
480
481 fn response_body_filter(
483 &self,
484 _session: &mut Session,
485 body: &mut Option<Bytes>,
486 end_of_body: bool,
487 ctx: &mut Self::CTX,
488 ) -> PingoraResult<Option<Duration>>
489 where
490 Self::CTX: Send + Sync,
491 {
492 let body_clone = body.clone();
494
495 debug!(
496 "[RESPONSE_BODY] len={:?}, end={}, is_streaming={}, is_conversion={}",
497 body_clone.as_ref().map(|b| b.len()),
498 end_of_body,
499 ctx.flags.is_streaming,
500 ctx.flags.is_conversion_request
501 );
502
503 if let Some(b) = body_clone.as_ref() {
504 if !ctx.flags.is_streaming
508 && ctx.flags.is_conversion_request
509 && ctx.buffers.response_body.len() + b.len() > MAX_RESPONSE_BODY_SIZE
510 {
511 warn!(
512 "[RESPONSE_BODY] Response body exceeds maximum size limit of {} bytes",
513 MAX_RESPONSE_BODY_SIZE
514 );
515 } else {
516 ctx.buffers.response_body.extend_from_slice(b);
517 }
518
519 if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !end_of_body {
522 *body = Some(Bytes::new());
523 }
524
525 if ctx.flags.should_convert_stream_response {
527 *body = Some(Bytes::new());
530
531 let provider = ctx.route.provider_name.as_ref()
533 .and_then(|name| self.get_provider(name));
534
535 let mut handler = StreamingResponseHandler::new(
537 ctx,
538 provider,
539 self.log_body,
540 self.conversation_store.clone(),
541 );
542
543 if let Some(converted) = handler.process_stream_frame() {
545 *body = Some(Bytes::from(converted));
546 }
547
548 if end_of_body {
550 let completed_events = handler.finalize_stream();
551 if !completed_events.is_empty() {
552 let existing = std::str::from_utf8(body.as_ref().unwrap_or(&Bytes::new()))
553 .unwrap_or("")
554 .to_string();
555 let combined = format!("{}{}", existing, completed_events.join(""));
556 *body = Some(Bytes::from(combined));
557 }
558 }
559 }
560 }
561
562 if end_of_body {
563 let duration_ms = ctx.start_time.elapsed().as_millis() as u64;
564
565 if !ctx.flags.is_streaming && ctx.flags.is_conversion_request && !ctx.buffers.response_body.is_empty() {
571 let text = std::str::from_utf8(&ctx.buffers.response_body).map_err(|e| {
572 error!("[RESPONSE_BODY] upstream body not valid UTF-8: {}", e);
573 pingora_core::Error::explain(
574 pingora_core::ErrorType::HTTPStatus(502),
575 format!("upstream response is not valid UTF-8: {e}"),
576 )
577 })?;
578 let chat_resp: crate::types::chat_api::ChatResponse =
579 serde_json::from_str(text).map_err(|e| {
580 error!("[RESPONSE_BODY] failed to parse upstream ChatResponse: {}", e);
581 pingora_core::Error::explain(
582 pingora_core::ErrorType::HTTPStatus(502),
583 format!("upstream response not a valid Chat completion: {e}"),
584 )
585 })?;
586 let assistant_message = chat_resp.choices.first().map(|c| c.message.clone());
587 let request_context = ctx
588 .stream_state
589 .as_ref()
590 .and_then(|s| s.request_context.as_ref());
591 let response_obj =
592 crate::convert::chat_to_response_with_context(chat_resp, request_context)
593 .map_err(|e| {
594 error!("[RESPONSE_BODY] failed to convert response: {}", e);
595 pingora_core::Error::explain(
596 pingora_core::ErrorType::HTTPStatus(500),
597 format!("proxy response conversion failed: {e}"),
598 )
599 })?;
600 let converted = serde_json::to_vec(&response_obj).map_err(|e| {
601 error!("[RESPONSE_BODY] failed to serialize converted response: {}", e);
602 pingora_core::Error::explain(
603 pingora_core::ErrorType::HTTPStatus(500),
604 format!("proxy response serialization failed: {e}"),
605 )
606 })?;
607 if self.log_body {
608 debug!(
609 "[CONVERTED RESPONSE] {}",
610 String::from_utf8_lossy(&converted)
611 );
612 }
613 *body = Some(Bytes::from(converted));
614 if let (Some(mut messages), Some(assistant_message)) = (
615 ctx.follow_up.pending_conversation_messages.clone(),
616 assistant_message,
617 ) {
618 messages.push(assistant_message);
619 self.store_conversation(
620 response_obj.id.clone(),
621 ConversationSnapshot {
622 instructions: ctx.follow_up.pending_instructions.clone(),
623 messages,
624 },
625 );
626 }
627 }
628
629 info!(
630 "[DONE] provider={}, model={:?}, duration={}ms",
631 ctx.route.provider_name.as_deref().unwrap_or("unknown"),
632 ctx.model.as_ref(),
633 duration_ms
634 );
635 }
636
637 Ok(None)
638 }
639
640 async fn connected_to_upstream(
642 &self,
643 _session: &mut Session,
644 reused: bool,
645 peer: &HttpPeer,
646 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
647 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
648 digest: Option<&Digest>,
649 ctx: &mut Self::CTX,
650 ) -> PingoraResult<()> {
651 let tls_version = digest
652 .and_then(|d| d.ssl_digest.as_ref())
653 .map(|ssl| ssl.version.to_string())
654 .unwrap_or_else(|| "none".to_string());
655
656 let use_tls = ctx.route.selected_backend.as_ref().map(|b| b.use_tls).unwrap_or(false);
657 let backend_name = ctx.route.provider_name.as_deref().unwrap_or("unknown");
658
659 info!(
660 "[CONNECT] {} -> {} (backend={}, TLS={}, reused={}, tls_version={})",
661 peer.sni(),
662 peer.address(),
663 backend_name,
664 use_tls,
665 reused,
666 tls_version
667 );
668
669 Ok(())
670 }
671
672 fn error_while_proxy(
674 &self,
675 peer: &HttpPeer,
676 _session: &mut Session,
677 e: Box<pingora_core::Error>,
678 ctx: &mut Self::CTX,
679 _client_reused: bool,
680 ) -> Box<pingora_core::Error> {
681 error!(
682 "[ERROR] proxy error to {}: {}",
683 peer.address(),
684 e
685 );
686
687 let mut e = e.more_context(format!("Provider: {}", ctx.route.provider_name.as_deref().unwrap_or("unknown")));
688 e.retry.decide_reuse(false);
689 e
690 }
691
692 async fn fail_to_proxy(
694 &self,
695 session: &mut Session,
696 e: &pingora_core::Error,
697 ctx: &mut Self::CTX,
698 ) -> pingora_proxy::FailToProxy
699 where
700 Self::CTX: Send + Sync,
701 {
702 let code = match *e.etype() {
703 pingora_core::ErrorType::ConnectTimedout => 504,
704 pingora_core::ErrorType::ConnectRefused => 502,
705 pingora_core::ErrorType::TLSHandshakeFailure => 502,
706 _ => 502,
707 };
708
709 let method = session.req_header().method.as_str();
710 let uri = &session.req_header().uri;
711
712 error!(
713 "[FAIL] {} {} -> {} (provider: {}, model: {:?}): {}",
714 method,
715 uri,
716 code,
717 ctx.route.provider_name.as_deref().unwrap_or("unknown"),
718 ctx.model.as_ref(),
719 e
720 );
721
722 let error_body = serde_json::json!({
724 "error": {
725 "type": "proxy_error",
726 "code": code,
727 "message": e.to_string()
728 }
729 })
730 .to_string();
731 if let Ok(mut resp) = pingora_http::ResponseHeader::build(code, None) {
732 let _ = resp.insert_header("content-type", "application/json");
733 let _ = resp.insert_header("content-length", error_body.len().to_string());
734 let _ = session.write_response_header(Box::new(resp), false).await;
735 let _ = session
736 .write_response_body(Some(bytes::Bytes::from(error_body)), true)
737 .await;
738 }
739
740 pingora_proxy::FailToProxy {
741 error_code: code,
742 can_reuse_downstream: false,
743 }
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use std::collections::HashMap;
750 use std::sync::Arc;
751
752 use crate::config::{BackendConfig, MatchRules};
753 use crate::types::chat_api::{ChatMessage, Content, MessageRole};
754 use crate::providers::GLMProvider;
755 use crate::providers::Provider;
756
757 use super::CodexProxy;
758
759 fn make_test_proxy() -> CodexProxy {
760 let configs = vec![BackendConfig {
761 name: "glm".to_string(),
762 url: "https://api.example.com".to_string(),
763 api_key: "test-key".to_string(),
764 protocol: "openai".to_string(),
765 model: None,
766 match_rules: MatchRules {
767 default: true,
768 ..Default::default()
769 },
770 }];
771 let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
772 let mut providers = HashMap::new();
773 providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
774 CodexProxy::new(router, providers, false, std::path::PathBuf::from("logs"))
775 }
776
777 #[test]
778 fn test_proxy_creation() {
779 let configs = vec![BackendConfig {
780 name: "glm".to_string(),
781 url: "https://api.example.com".to_string(),
782 api_key: "test-key".to_string(),
783 protocol: "openai".to_string(),
784 model: None,
785 match_rules: MatchRules {
786 default: true,
787 ..Default::default()
788 },
789 }];
790 let router = Arc::new(crate::config::BackendRouter::new(configs).unwrap());
791 let mut providers = HashMap::new();
792 providers.insert("glm".to_string(), Arc::new(GLMProvider) as Arc<dyn Provider>);
793 let proxy = CodexProxy::new(router, providers, true, std::path::PathBuf::from("logs"));
794 assert!(proxy.log_body);
795 }
796
797 #[test]
798 fn test_try_convert_request_body_rejects_malformed_json() {
799 let proxy = make_test_proxy();
802 let mut ctx = super::ProxyContext::new();
803 ctx.flags.is_conversion_request = true;
804 ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
805 ctx.buffers.request_body = b"{not valid json".to_vec();
806 let err = proxy
807 .try_convert_request_body(&mut ctx)
808 .expect_err("must fail on invalid JSON");
809 assert!(matches!(err, crate::error::ConversionError::JsonError(_)));
810 }
811
812 #[test]
813 fn test_try_convert_request_body_succeeds_on_minimal_request() {
814 let proxy = make_test_proxy();
815 let mut ctx = super::ProxyContext::new();
816 ctx.flags.is_conversion_request = true;
817 ctx.route.selected_backend = proxy.router.select_with_config("/", &[]).map(|(_, info)| info.clone());
818 ctx.buffers.request_body = br#"{"model":"glm-4","input":"hi"}"#.to_vec();
819 let bytes = proxy
820 .try_convert_request_body(&mut ctx)
821 .expect("conversion should succeed");
822 let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
823 assert_eq!(json["model"], "glm-4");
824 assert!(json["messages"].is_array());
825 }
826
827 fn msg(role: MessageRole, text: &str) -> ChatMessage {
828 ChatMessage {
829 role,
830 content: Content::String(text.to_string()),
831 name: None,
832 annotations: None,
833 tool_calls: None,
834 function_call: None,
835 tool_call_id: None,
836 refusal: None,
837 }
838 }
839
840 #[test]
841 fn test_merge_history_messages_prefers_previous_and_appends_incremental() {
842 let history = vec![
843 msg(MessageRole::System, "You are helpful"),
844 msg(MessageRole::User, "hello"),
845 msg(MessageRole::Assistant, "hi"),
846 ];
847 let current = vec![
848 msg(MessageRole::System, "You are helpful"),
849 msg(MessageRole::User, "hello"),
850 msg(MessageRole::Assistant, "hi"),
851 msg(MessageRole::User, "next question"),
852 ];
853
854 let merged = super::merge_history_messages(history, current);
855 assert_eq!(merged.len(), 4);
856 assert_eq!(merged[3].content.as_text(), "next question");
857 }
858
859 #[test]
860 fn test_merge_history_messages_when_no_overlap_appends_all_current() {
861 let history = vec![msg(MessageRole::System, "system"), msg(MessageRole::Assistant, "a1")];
862 let current = vec![msg(MessageRole::User, "new question")];
863
864 let merged = super::merge_history_messages(history, current);
865 assert_eq!(merged.len(), 3);
866 assert_eq!(merged[2].content.as_text(), "new question");
867 }
868}